Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add mechanism to manage offsets through KafkaConnector and KafkaMirrorMaker2 resources #10563

Merged
merged 7 commits into from
Sep 18, 2024

Conversation

katheris
Copy link
Contributor

@katheris katheris commented Sep 10, 2024

Type of change

Select the type of your PR

  • Enhancement / new feature

Description

Add mechanism to manage offsets through KafkaConnector and KafkaMirrorMaker2 resources.
Users can now list, alter and reset offsets for their connectors using CRDs.
This is done via annotations to trigger the action and ConfigMaps for the list output and alter input.
This PR implements the agreed approaches in proposals 076 and 083

Checklist

Please go through this checklist and make sure all applicable tasks have been done

  • Write tests
  • Make sure all tests pass
  • Update documentation
  • Check RBAC rights for Kubernetes / OpenShift roles
  • Try your changes from Pod inside your Kubernetes and OpenShift cluster, not just locally
  • Reference relevant issue(s) and close them after merging
  • Update CHANGELOG.md
  • Supply screenshots for visual changes, such as Grafana dashboards

@katheris katheris force-pushed the connectorOffsetManagement branch 6 times, most recently from b493fcb to 50dbb76 Compare September 11, 2024 13:17
KafkaConnector and KafkaMirrorMaker2 resources

Signed-off-by: Katherine Stanley <11195226+katheris@users.noreply.github.com>
Signed-off-by: Katherine Stanley <11195226+katheris@users.noreply.github.com>
@katheris katheris force-pushed the connectorOffsetManagement branch from 50dbb76 to fa913c5 Compare September 11, 2024 14:03
@katheris katheris marked this pull request as ready for review September 11, 2024 15:40
@katheris katheris added this to the 0.44.0 milestone Sep 11, 2024
@katheris katheris requested a review from a team September 11, 2024 15:41
@scholzj
Copy link
Member

scholzj commented Sep 11, 2024

/azp run regression

Copy link

Azure Pipelines successfully started running 1 pipeline(s).

private Map<String, Object> additionalProperties;

@Description("Reference to the ConfigMap where the list of offsets will be written to.")
@JsonProperty(value = "toConfigMap", required = true)
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Why don't you just name it toConfigMap / getToConfigMap / setToConfigMap?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I was trying to provide a more descriptive method name just to make the code more readable, happy to change it though if you think it's better to stay closer to the actual property name

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I personally find it much better when the names follow the API names. It makes it easiest to align the Java and the CRD models, it makes lot less confusion when checking some YAML against the Java code etc. I also think we usually don't map the property like this unless it conflicts with some Java keywords. So I personally would change this.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Alternatively, I personally would be also fine if we change the API against the proposal and name the field simply configMap. It might make the name less weird and possibly allow to reuse the same class for both listing and setting the offsets.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I've switched it to setToConfigMap and getToConfigMap

*/
package io.strimzi.api.kafka.model.connector;

public enum KafkaConnectorOffsetsAnnotation {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I guess I can accept that we have the annotations enum in the api. But it probably should have only the valid values and not none or unknown, or? Why is that needed?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I added them to use when processing the annotation in the code, but in reality I'm only using none as the default when fetching the annotation so I'll remove unknown. I placed this class in api as I figured it's part of the API, but it's actually only used from the cluster-operator module, so should I actually move it there?

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I was wondering if it should be in the api. I'm personally not a big fan of it. But we already do have KafkaRebalanceAnnotation as an example of this being in the api module before. So I felt like I should not insist on this being moved (funny enough, it also includes the same options you have there which I did not notice yesterday 🙄).

I think the logic of having it in the API is that users might use it to set the annotation programmatically. But then it would be better if it contained the real values that the user is expected to use and not the none and unknown values.

So, I don't know ... I would do it differently. But it does not feel fair to insist you change it if you are happy with it 🤷.

Copy link
Member

@ppatierno ppatierno Sep 16, 2024

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

That's a fair point Jakub instead. I was thinking about the same (while working on auto-rebalancing) but while i.e. refresh, approve, stop are needed by the users, the none and unknown are there just as internal values for the rebalance logic. Maybe Kate could address this here (now) and we should do the same on the rebalance operator side.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@ppatierno I've removed unknown since I don't use that internally. When you say "Maybe Kate could address this here (now)" do you mean moving the class to cluster-operator module?

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

unknown was removed but none is still there which is not a valid annotation, it's just the absence of it.
IIRC the idea of having these annotation in the api module was because of providing enums to a user who is developing an automation on handling a KafkaRebalance resource, so instead of writing a string, they can use our enum. Of course, having none (and unknown) brings confusion. So or we put these none somewhere else leaving the enum in the api module or we just remove it from the api module (and we should do the same for the rebalancing operator) leaving the user to use the api module but then just strings when it comes to apply these annotations.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I've moved it to io.strimzi.operator.cluster.operator.resource in the cluster-operator module, if you're both happy with that I'll mark this thread as resolved.

Copy link
Member

@scholzj scholzj Sep 16, 2024

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

You mean the whole annotation class? Would io.strimzi.operator.cluster.model be better? Or at least io.strimzi.operator.cluster.operator.assembly?

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Agree on the model package.

Comment on lines 874 to 895
abstract KafkaConnectorOffsetsAnnotation getConnectorOffsetsAnnotation(CustomResource resource, String connectorName);

/**
* Patches the custom resource to remove the connector-offsets annotation
*
* @param reconciliation Reconciliation marker
* @param resource Custom resource from which the annotation should be removed
*
* @return Future that indicates the operation completion
*/
@SuppressWarnings({ "rawtypes" })
abstract Future<Void> removeConnectorOffsetsAnnotation(Reconciliation reconciliation, CustomResource resource);

/**
* Returns the key to use for either writing connector offsets to a ConfigMap or fetching connector offsets
* from a ConfigMap.
*
* @param connectorName Name of the connector that is being managed.
*
* @return The String to use when interacting with ConfigMap resources.
*/
abstract String getConnectorOffsetsConfigMapEntryKey(String connectorName);
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The names with Annotation in singular sound strange if you then actually use multiple of them. Maybe we can try to find some more neutral names?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

What about getConnectorOffsetsAnnotation -> getConnectorOffsetsOperation or getConnectorOffsetsAction?
And removeConnectorOffsetsAnnotation -> `removeConnectorOffsetsAnnotations?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We could even rename the enum KafkaConnectorOffsetsAnnotation to KafkaConnectorOffsetsOperation

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yeah, I think calling it operation or action would make it a bit more clear. Not sure I would change the KafkaConnectorOffsetsAnnotation name as that is the enum with the possible values, or? So that makes sense to and with Annotation.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I've renamed them

Copy link
Member

@see-quick see-quick left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Not a SME but the code looks good apart from Jakub's comments. Thanks.

Moreover, I was thinking about STs we should write to cover such feature. The first one (basic one), which came to my mind was to test all operations (i.e., list, alter and reset offsets). One test for Connector and one for MM2. WDYT?

if (!(path instanceof String state)) {
return Future.failedFuture("JSON response lacked $.connector.state");
}
if (!"STOPPED".equals(state)) {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Do we have some Enum instead of this magic constant? I assume you can use io.strimzi.api.kafka.model.common.ConnectorState? :) The same applies in previous parts.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The tricky part here is the ConnectorState enum values are based on the CRD field which is lowercase, whereas the state returned from the Connect API is upper case. @scholzj @mimaison what do you think of adding a fromStatus method to io.strimzi.api.kafka.model.common.ConnectorState that we can call directly here to convert the state to the ConnectorState enum? I noticed that the code styles dislikes using toUpperCase and toLowerCase

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The tricky part here is the ConnectorState enum values are based on the CRD field which is lowercase, whereas the state returned from the Connect API is upper case. @scholzj @mimaison what do you think of adding a fromStatus method to io.strimzi.api.kafka.model.common.ConnectorState that we can call directly here to convert the state to the ConnectorState enum?

I'm not sure I would add special method like that to the api module. But maybe you can simply convert it to lowercase in the forValue method? We do that in other places ... ProcessRoles, NodeAddressType etc.

I noticed that the code styles dislikes using toUpperCase and toLowerCase

You should be able to use them, but you should specify the locale:

toLowerCase(Locale.ENGLISH)

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Ah I misunderstood the code style error then, I've updated it to do the toLowerCase

Signed-off-by: Katherine Stanley <11195226+katheris@users.noreply.github.com>
@katheris
Copy link
Contributor Author

I forgot to build and push the Helm changes from switching to LocalObjectReference, will try to find some time over the weekend to do the full build and push those.

*/
package io.strimzi.api.kafka.model.connector;

public enum KafkaConnectorOffsetsAnnotation {
Copy link
Member

@ppatierno ppatierno Sep 16, 2024

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

That's a fair point Jakub instead. I was thinking about the same (while working on auto-rebalancing) but while i.e. refresh, approve, stop are needed by the users, the none and unknown are there just as internal values for the rebalance logic. Maybe Kate could address this here (now) and we should do the same on the rebalance operator side.

.compose(offsets -> generateListOffsetsConfigMap(configMapName, connectorName, resource, offsets))
.compose(configMap -> configMapOperations.reconcile(reconciliation, resource.getMetadata().getNamespace(), configMapName, configMap))
.compose(v -> removeConnectorOffsetsAnnotations(reconciliation, resource))
.compose(v -> Future.succeededFuture(conditions), throwable -> {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

could be onSuccess and onFailure be used to make it more readable? No need to change just an idea.

Copy link
Contributor Author

@katheris katheris Sep 17, 2024

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Using onSuccess and onFailure doesn't work because in the failure case we still want to return a succeeded future, so we need to pass a mapper, whereas onFailure takes a handler, so the method would still return a failed future. Similarly for onSuccess we have to map the result, not just register a handler.

I could change it to use map and otherwise?

        return apiClient.getConnectorOffsets(reconciliation, host, port, connectorName)
                .compose(offsets -> generateListOffsetsConfigMap(configMapName, connectorName, resource, offsets))
                .compose(configMap -> configMapOperations.reconcile(reconciliation, resource.getMetadata().getNamespace(), configMapName, configMap))
                .compose(v -> removeConnectorOffsetsAnnotations(reconciliation, resource))
                .map(v -> conditions)
                .otherwise(throwable -> {
                    // Don't fail reconciliation on error from listing offsets - add a warning and repeat list on next reconcile
                    String message = "Encountered error listing connector offsets. " + throwable.getMessage();
                    LOGGER.warnCr(reconciliation, message);
                    conditions.add(StatusUtils.buildWarningCondition("ListOffsets", message));
                    return conditions;
                });

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I like it, I think it's more readable but I will leave the final call to you.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I've updated it

.compose(v -> getOffsetsForAlterRequest(configMapNamespace, configMapName, getConnectorOffsetsConfigMapEntryKey(connectorName)))
.compose(offsets -> apiClient.alterConnectorOffsets(reconciliation, host, port, connectorName, offsets))
.compose(v -> removeConnectorOffsetsAnnotations(reconciliation, resource))
.compose(v -> Future.succeededFuture(conditions), throwable -> {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Ditto as above.

Signed-off-by: Katherine Stanley <11195226+katheris@users.noreply.github.com>
Signed-off-by: Katherine Stanley <11195226+katheris@users.noreply.github.com>
@scholzj scholzj linked an issue Sep 16, 2024 that may be closed by this pull request
…odel

Signed-off-by: Katherine Stanley <11195226+katheris@users.noreply.github.com>
Signed-off-by: Katherine Stanley <11195226+katheris@users.noreply.github.com>
@scholzj
Copy link
Member

scholzj commented Sep 17, 2024

/azp run regression

Copy link

Azure Pipelines successfully started running 1 pipeline(s).

Copy link
Member

@see-quick see-quick left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

LGTM, thanks Kate! 👍

@scholzj scholzj merged commit f0de958 into strimzi:main Sep 18, 2024
21 checks passed
@scholzj
Copy link
Member

scholzj commented Sep 18, 2024

Thanks for the PR @katheris

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
Status: 0.44.0
Development

Successfully merging this pull request may close these issues.

[Enhancement]: Support resetting offsets for KafkaConnect
4 participants