Skip to content

Commit

Permalink
Block options and not prefixes in Kafka Connector configs (#10503)
Browse files Browse the repository at this point in the history
Signed-off-by: Jakub Scholz <www@scholzj.com>
  • Loading branch information
scholzj authored Sep 2, 2024
1 parent 57352a8 commit d0cb635
Show file tree
Hide file tree
Showing 15 changed files with 119 additions and 159 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -33,7 +33,7 @@
@ToString(callSuper = true)
public abstract class AbstractConnectorSpec extends Spec {
/**
* Forbidden options in the connector configuration
* Forbidden options in the connector configuration => these are full options and not prefixes
*/
public static final String FORBIDDEN_PARAMETERS = "name, connector.class, tasks.max";

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,6 @@
import io.strimzi.operator.common.ReconciliationLogger;
import io.strimzi.operator.common.model.OrderedProperties;

import java.util.Collections;
import java.util.List;
import java.util.Locale;
import java.util.Map;
Expand All @@ -34,126 +33,85 @@ public AbstractConfiguration(AbstractConfiguration configuration) {
options.addMapPairs(configuration.asOrderedProperties().asMap());
}

/**
* Constructor used to instantiate this class from String configuration. Should be used to create configuration
* from the Assembly.
*
* @param reconciliation The reconciliation
* @param configuration Configuration in String format. Should contain zero or more lines with with key=value
* pairs.
* @param forbiddenPrefixes List with configuration key prefixes which are not allowed. All keys which start with one of
* these prefixes will be ignored.
*/
public AbstractConfiguration(Reconciliation reconciliation, String configuration, List<String> forbiddenPrefixes) {
options.addStringPairs(configuration);
filterForbidden(reconciliation, forbiddenPrefixes);
}

/**
* Constructor used to instantiate this class from String configuration. Should be used to create configuration
* from the Assembly.
*
* @param reconciliation The reconciliation
* @param configuration Configuration in String format. Should contain zero or more lines with with key=value
* pairs.
* @param forbiddenPrefixes List with configuration key prefixes which are not allowed. All keys which start with one of
* these prefixes will be ignored.
* @param defaults Properties object with default options
*/
public AbstractConfiguration(Reconciliation reconciliation, String configuration, List<String> forbiddenPrefixes, Map<String, String> defaults) {
options.addMapPairs(defaults);
options.addStringPairs(configuration);
filterForbidden(reconciliation, forbiddenPrefixes);
}

/**
* Constructor used to instantiate this class from JsonObject. Should be used to create configuration from
* ConfigMap / CRD.
*
* @param reconciliation The reconciliation
* @param jsonOptions Json object with configuration options as key ad value pairs.
* @param forbiddenPrefixes List with configuration key prefixes which are not allowed. All keys which start with one of
* these prefixes will be ignored.
*/
public AbstractConfiguration(Reconciliation reconciliation, Iterable<Map.Entry<String, Object>> jsonOptions, List<String> forbiddenPrefixes) {
options.addIterablePairs(jsonOptions);
filterForbidden(reconciliation, forbiddenPrefixes);
}

/**
* Constructor used to instantiate this class from JsonObject. Should be used to create configuration from
* ConfigMap / CRD.
*
* @param reconciliation The reconciliation
* @param jsonOptions Json object with configuration options as key ad value pairs.
* @param forbiddenPrefixes List with configuration key prefixes which are not allowed. All keys which start with one of
* these prefixes will be ignored.
* @param forbiddenPrefixExceptions Exceptions excluded from forbidden prefix options checking
*/
public AbstractConfiguration(Reconciliation reconciliation, Iterable<Map.Entry<String, Object>> jsonOptions, List<String> forbiddenPrefixes, List<String> forbiddenPrefixExceptions) {
options.addIterablePairs(jsonOptions);
filterForbidden(reconciliation, forbiddenPrefixes, forbiddenPrefixExceptions);
}

/**
* Constructor used to instantiate this class from JsonObject. Should be used to create configuration from
* ConfigMap / CRD.
*
* @param reconciliation The reconciliation
* @param jsonOptions Json object with configuration options as key ad value pairs.
* @param forbiddenPrefixes List with configuration key prefixes which are not allowed. All keys which start with one of
* these prefixes will be ignored.
* @param defaults Properties object with default options
* @param reconciliation The reconciliation
* @param jsonOptions Json object with configuration options as key ad value pairs.
* @param forbiddenPrefixes List with configuration key prefixes which are not allowed. All keys which
* start with one of these prefixes will be ignored.
* @param forbiddenPrefixExceptions Exceptions excluded from forbidden prefix options checking
* @param forbiddenOptions List with the exact configuration options (not prefixes) that should not be used
* @param defaults Properties object with default options
*/
public AbstractConfiguration(Reconciliation reconciliation, Iterable<Map.Entry<String, Object>> jsonOptions, List<String> forbiddenPrefixes, Map<String, String> defaults) {
public AbstractConfiguration(Reconciliation reconciliation, Iterable<Map.Entry<String, Object>> jsonOptions, List<String> forbiddenPrefixes, List<String> forbiddenPrefixExceptions, List<String> forbiddenOptions, Map<String, String> defaults) {
options.addMapPairs(defaults);
options.addIterablePairs(jsonOptions);
filterForbidden(reconciliation, forbiddenPrefixes);
filterForbidden(reconciliation, forbiddenPrefixes, forbiddenPrefixExceptions, forbiddenOptions);
}

/**
* Constructor used to instantiate this class from JsonObject. Should be used to create configuration from
* ConfigMap / CRD.
*
* @param reconciliation The reconciliation
* @param jsonOptions Json object with configuration options as key ad value pairs.
* @param forbiddenPrefixes List with configuration key prefixes which are not allowed. All keys which start with one of
* these prefixes will be ignored.
* @param reconciliation The reconciliation
* @param configuration Configuration in String format. Should contain zero or more lines with
* key=value pairs.
* @param forbiddenPrefixes List with configuration key prefixes which are not allowed. All keys which
* start with one of these prefixes will be ignored.
* @param forbiddenPrefixExceptions Exceptions excluded from forbidden prefix options checking
* @param defaults Properties object with default options
* @param forbiddenOptions List with the exact configuration options (not prefixes) that should not be used
* @param defaults Properties object with default options
*/
public AbstractConfiguration(Reconciliation reconciliation, Iterable<Map.Entry<String, Object>> jsonOptions, List<String> forbiddenPrefixes, List<String> forbiddenPrefixExceptions, Map<String, String> defaults) {
public AbstractConfiguration(Reconciliation reconciliation, String configuration, List<String> forbiddenPrefixes, List<String> forbiddenPrefixExceptions, List<String> forbiddenOptions, Map<String, String> defaults) {
options.addMapPairs(defaults);
options.addIterablePairs(jsonOptions);
filterForbidden(reconciliation, forbiddenPrefixes, forbiddenPrefixExceptions);
options.addStringPairs(configuration);
filterForbidden(reconciliation, forbiddenPrefixes, forbiddenPrefixExceptions, forbiddenOptions);
}

/**
* Filters forbidden values from the configuration.
*
* @param reconciliation The reconciliation
* @param forbiddenPrefixes List with configuration key prefixes which are not allowed. All keys which start with one of
* these prefixes will be ignored.
* @param reconciliation The reconciliation
* @param forbiddenPrefixes List with configuration key prefixes which are not allowed. All keys which start
* with one of these prefixes will be ignored.
* @param forbiddenPrefixExceptions Exceptions excluded from forbidden prefix options checking
* @param forbiddenOptions List with the exact configuration options (not prefixes) that should not be used
*/
private void filterForbidden(Reconciliation reconciliation, List<String> forbiddenPrefixes, List<String> forbiddenPrefixExceptions) {
private void filterForbidden(Reconciliation reconciliation, List<String> forbiddenPrefixes, List<String> forbiddenPrefixExceptions, List<String> forbiddenOptions) {
// We filter the prefixes first
options.filter(k -> forbiddenPrefixes.stream().anyMatch(s -> {
boolean forbidden = k.toLowerCase(Locale.ENGLISH).startsWith(s);

if (forbidden) {
if (forbiddenPrefixExceptions.contains(k))
forbidden = false;
}

if (forbidden) {
LOGGER.warnCr(reconciliation, "Configuration option \"{}\" is forbidden and will be ignored", k);
} else {
LOGGER.traceCr(reconciliation, "Configuration option \"{}\" is allowed and will be passed to the assembly", k);
}

return forbidden;
}));
}

private void filterForbidden(Reconciliation reconciliation, List<String> forbiddenPrefixes) {
this.filterForbidden(reconciliation, forbiddenPrefixes, Collections.emptyList());
// Then we filter the forbidden options
if (!forbiddenOptions.isEmpty()) {
options.filter(k -> forbiddenOptions.stream().anyMatch(s -> {
boolean forbidden = k.toLowerCase(Locale.ENGLISH).equals(s);

if (forbidden) {
LOGGER.warnCr(reconciliation, "Configuration option \"{}\" is forbidden and will be ignored", k);
} else {
LOGGER.traceCr(reconciliation, "Configuration option \"{}\" is allowed and will be passed to the assembly", k);
}

return forbidden;
}));
}
}

/**
Expand Down Expand Up @@ -208,8 +166,9 @@ public String getConfiguration() {
}

/**
* Get access to underlying key-value pairs. Any changes to OrderedProperties will be reflected in
* in value returned by subsequent calls to getConfiguration()
* Get access to underlying key-value pairs. Any changes to OrderedProperties will be reflected in value returned
* by subsequent calls to getConfiguration()
*
* @return A map of keys to values.
*/
public OrderedProperties asOrderedProperties() {
Expand All @@ -222,7 +181,7 @@ public OrderedProperties asOrderedProperties() {
* @param prefixes String with comma-separated items
* @return List with the values as separate items
*/
protected static List<String> splitPrefixesToList(String prefixes) {
protected static List<String> splitPrefixesOrOptionsToList(String prefixes) {
return asList(prefixes.split("\\s*,+\\s*"));
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -22,8 +22,8 @@ public class KafkaBridgeAdminClientConfiguration extends AbstractConfiguration {
private static final Map<String, String> DEFAULTS;

static {
FORBIDDEN_PREFIXES = AbstractConfiguration.splitPrefixesToList(KafkaBridgeAdminClientSpec.FORBIDDEN_PREFIXES);
FORBIDDEN_PREFIX_EXCEPTIONS = AbstractConfiguration.splitPrefixesToList(KafkaBridgeAdminClientSpec.FORBIDDEN_PREFIX_EXCEPTIONS);
FORBIDDEN_PREFIXES = AbstractConfiguration.splitPrefixesOrOptionsToList(KafkaBridgeAdminClientSpec.FORBIDDEN_PREFIXES);
FORBIDDEN_PREFIX_EXCEPTIONS = AbstractConfiguration.splitPrefixesOrOptionsToList(KafkaBridgeAdminClientSpec.FORBIDDEN_PREFIX_EXCEPTIONS);
DEFAULTS = new HashMap<>(0);
}

Expand All @@ -35,6 +35,6 @@ public class KafkaBridgeAdminClientConfiguration extends AbstractConfiguration {
* @param jsonOptions Json object with configuration options as key ad value pairs.
*/
public KafkaBridgeAdminClientConfiguration(Reconciliation reconciliation, Iterable<Map.Entry<String, Object>> jsonOptions) {
super(reconciliation, jsonOptions, FORBIDDEN_PREFIXES, FORBIDDEN_PREFIX_EXCEPTIONS, DEFAULTS);
super(reconciliation, jsonOptions, FORBIDDEN_PREFIXES, FORBIDDEN_PREFIX_EXCEPTIONS, List.of(), DEFAULTS);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -22,8 +22,8 @@ public class KafkaBridgeConsumerConfiguration extends AbstractConfiguration {
private static final Map<String, String> DEFAULTS;

static {
FORBIDDEN_PREFIXES = AbstractConfiguration.splitPrefixesToList(KafkaBridgeConsumerSpec.FORBIDDEN_PREFIXES);
FORBIDDEN_PREFIX_EXCEPTIONS = AbstractConfiguration.splitPrefixesToList(KafkaBridgeConsumerSpec.FORBIDDEN_PREFIX_EXCEPTIONS);
FORBIDDEN_PREFIXES = AbstractConfiguration.splitPrefixesOrOptionsToList(KafkaBridgeConsumerSpec.FORBIDDEN_PREFIXES);
FORBIDDEN_PREFIX_EXCEPTIONS = AbstractConfiguration.splitPrefixesOrOptionsToList(KafkaBridgeConsumerSpec.FORBIDDEN_PREFIX_EXCEPTIONS);
DEFAULTS = new HashMap<>(0);
}

Expand All @@ -35,6 +35,6 @@ public class KafkaBridgeConsumerConfiguration extends AbstractConfiguration {
* @param jsonOptions Json object with configuration options as key ad value pairs.
*/
public KafkaBridgeConsumerConfiguration(Reconciliation reconciliation, Iterable<Map.Entry<String, Object>> jsonOptions) {
super(reconciliation, jsonOptions, FORBIDDEN_PREFIXES, FORBIDDEN_PREFIX_EXCEPTIONS, DEFAULTS);
super(reconciliation, jsonOptions, FORBIDDEN_PREFIXES, FORBIDDEN_PREFIX_EXCEPTIONS, List.of(), DEFAULTS);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -22,8 +22,8 @@ public class KafkaBridgeProducerConfiguration extends AbstractConfiguration {
private static final Map<String, String> DEFAULTS;

static {
FORBIDDEN_PREFIXES = AbstractConfiguration.splitPrefixesToList(KafkaBridgeProducerSpec.FORBIDDEN_PREFIXES);
FORBIDDEN_PREFIX_EXCEPTIONS = AbstractConfiguration.splitPrefixesToList(KafkaBridgeProducerSpec.FORBIDDEN_PREFIX_EXCEPTIONS);
FORBIDDEN_PREFIXES = AbstractConfiguration.splitPrefixesOrOptionsToList(KafkaBridgeProducerSpec.FORBIDDEN_PREFIXES);
FORBIDDEN_PREFIX_EXCEPTIONS = AbstractConfiguration.splitPrefixesOrOptionsToList(KafkaBridgeProducerSpec.FORBIDDEN_PREFIX_EXCEPTIONS);
DEFAULTS = new HashMap<>(0);
}

Expand All @@ -35,6 +35,6 @@ public class KafkaBridgeProducerConfiguration extends AbstractConfiguration {
* @param jsonOptions Json object with configuration options as key ad value pairs.
*/
public KafkaBridgeProducerConfiguration(Reconciliation reconciliation, Iterable<Map.Entry<String, Object>> jsonOptions) {
super(reconciliation, jsonOptions, FORBIDDEN_PREFIXES, FORBIDDEN_PREFIX_EXCEPTIONS, DEFAULTS);
super(reconciliation, jsonOptions, FORBIDDEN_PREFIXES, FORBIDDEN_PREFIX_EXCEPTIONS, List.of(), DEFAULTS);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -49,8 +49,8 @@ public class KafkaConfiguration extends AbstractConfiguration {
private static final List<String> FORBIDDEN_PREFIX_EXCEPTIONS;

static {
FORBIDDEN_PREFIXES = AbstractConfiguration.splitPrefixesToList(KafkaClusterSpec.FORBIDDEN_PREFIXES);
FORBIDDEN_PREFIX_EXCEPTIONS = AbstractConfiguration.splitPrefixesToList(KafkaClusterSpec.FORBIDDEN_PREFIX_EXCEPTIONS);
FORBIDDEN_PREFIXES = AbstractConfiguration.splitPrefixesOrOptionsToList(KafkaClusterSpec.FORBIDDEN_PREFIXES);
FORBIDDEN_PREFIX_EXCEPTIONS = AbstractConfiguration.splitPrefixesOrOptionsToList(KafkaClusterSpec.FORBIDDEN_PREFIX_EXCEPTIONS);
}

/**
Expand Down Expand Up @@ -212,11 +212,11 @@ public KafkaConfiguration(KafkaConfiguration configuration) {
* @param jsonOptions Json object with configuration options as key ad value pairs.
*/
public KafkaConfiguration(Reconciliation reconciliation, Iterable<Map.Entry<String, Object>> jsonOptions) {
super(reconciliation, jsonOptions, FORBIDDEN_PREFIXES, FORBIDDEN_PREFIX_EXCEPTIONS);
super(reconciliation, jsonOptions, FORBIDDEN_PREFIXES, FORBIDDEN_PREFIX_EXCEPTIONS, List.of(), Map.of());
}

private KafkaConfiguration(Reconciliation reconciliation, String configuration, List<String> forbiddenPrefixes) {
super(reconciliation, configuration, forbiddenPrefixes);
super(reconciliation, configuration, forbiddenPrefixes, List.of(), List.of(), Map.of());
}


Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,8 +21,8 @@ public class KafkaConnectConfiguration extends AbstractConfiguration {
private static final Map<String, String> DEFAULTS;

static {
FORBIDDEN_PREFIXES = AbstractConfiguration.splitPrefixesToList(KafkaConnectSpec.FORBIDDEN_PREFIXES);
FORBIDDEN_PREFIX_EXCEPTIONS = AbstractConfiguration.splitPrefixesToList(KafkaConnectSpec.FORBIDDEN_PREFIX_EXCEPTIONS);
FORBIDDEN_PREFIXES = AbstractConfiguration.splitPrefixesOrOptionsToList(KafkaConnectSpec.FORBIDDEN_PREFIXES);
FORBIDDEN_PREFIX_EXCEPTIONS = AbstractConfiguration.splitPrefixesOrOptionsToList(KafkaConnectSpec.FORBIDDEN_PREFIX_EXCEPTIONS);

DEFAULTS = new HashMap<>(6);
DEFAULTS.put("group.id", "connect-cluster");
Expand All @@ -41,6 +41,6 @@ public class KafkaConnectConfiguration extends AbstractConfiguration {
* @param jsonOptions Json object with configuration options as key ad value pairs.
*/
public KafkaConnectConfiguration(Reconciliation reconciliation, Iterable<Map.Entry<String, Object>> jsonOptions) {
super(reconciliation, jsonOptions, FORBIDDEN_PREFIXES, FORBIDDEN_PREFIX_EXCEPTIONS, DEFAULTS);
super(reconciliation, jsonOptions, FORBIDDEN_PREFIXES, FORBIDDEN_PREFIX_EXCEPTIONS, List.of(), DEFAULTS);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -8,22 +8,17 @@
import io.strimzi.api.kafka.model.connector.AbstractConnectorSpec;
import io.strimzi.operator.common.Reconciliation;

import java.util.HashMap;
import java.util.List;
import java.util.Map;

/**
* Class for handling KafkaConnector configuration passed by the user
*/
public class KafkaConnectorConfiguration extends AbstractConfiguration {
private static final List<String> FORBIDDEN_PREFIXES;
private static final List<String> FORBIDDEN_PREFIX_EXCEPTIONS;
private static final Map<String, String> DEFAULTS;
private static final List<String> FORBIDDEN_OPTIONS;

static {
FORBIDDEN_PREFIXES = AbstractConfiguration.splitPrefixesToList(AbstractConnectorSpec.FORBIDDEN_PARAMETERS);
FORBIDDEN_PREFIX_EXCEPTIONS = List.of();
DEFAULTS = new HashMap<>(0);
FORBIDDEN_OPTIONS = AbstractConfiguration.splitPrefixesOrOptionsToList(AbstractConnectorSpec.FORBIDDEN_PARAMETERS);
}

/**
Expand All @@ -34,6 +29,6 @@ public class KafkaConnectorConfiguration extends AbstractConfiguration {
* @param jsonOptions Json object with configuration options as key ad value pairs.
*/
public KafkaConnectorConfiguration(Reconciliation reconciliation, Iterable<Map.Entry<String, Object>> jsonOptions) {
super(reconciliation, jsonOptions, FORBIDDEN_PREFIXES, FORBIDDEN_PREFIX_EXCEPTIONS, DEFAULTS);
super(reconciliation, jsonOptions, List.of(), List.of(), FORBIDDEN_OPTIONS, Map.of());
}
}
Loading

0 comments on commit d0cb635

Please sign in to comment.