Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Block options and not prefixes in Kafka Connector configs #10503

Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -33,7 +33,7 @@
@ToString(callSuper = true)
public abstract class AbstractConnectorSpec extends Spec {
/**
* Forbidden options in the connector configuration
* Forbidden options in the connector configuration => these are full options and not prefixes
*/
public static final String FORBIDDEN_PARAMETERS = "name, connector.class, tasks.max";

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,6 @@
import io.strimzi.operator.common.ReconciliationLogger;
import io.strimzi.operator.common.model.OrderedProperties;

import java.util.Collections;
import java.util.List;
import java.util.Locale;
import java.util.Map;
Expand All @@ -34,126 +33,85 @@ public AbstractConfiguration(AbstractConfiguration configuration) {
options.addMapPairs(configuration.asOrderedProperties().asMap());
}

/**
* Constructor used to instantiate this class from String configuration. Should be used to create configuration
* from the Assembly.
*
* @param reconciliation The reconciliation
* @param configuration Configuration in String format. Should contain zero or more lines with with key=value
* pairs.
* @param forbiddenPrefixes List with configuration key prefixes which are not allowed. All keys which start with one of
* these prefixes will be ignored.
*/
public AbstractConfiguration(Reconciliation reconciliation, String configuration, List<String> forbiddenPrefixes) {
options.addStringPairs(configuration);
filterForbidden(reconciliation, forbiddenPrefixes);
}

/**
* Constructor used to instantiate this class from String configuration. Should be used to create configuration
* from the Assembly.
*
* @param reconciliation The reconciliation
* @param configuration Configuration in String format. Should contain zero or more lines with with key=value
* pairs.
* @param forbiddenPrefixes List with configuration key prefixes which are not allowed. All keys which start with one of
* these prefixes will be ignored.
* @param defaults Properties object with default options
*/
public AbstractConfiguration(Reconciliation reconciliation, String configuration, List<String> forbiddenPrefixes, Map<String, String> defaults) {
options.addMapPairs(defaults);
options.addStringPairs(configuration);
filterForbidden(reconciliation, forbiddenPrefixes);
}

/**
* Constructor used to instantiate this class from JsonObject. Should be used to create configuration from
* ConfigMap / CRD.
*
* @param reconciliation The reconciliation
* @param jsonOptions Json object with configuration options as key ad value pairs.
* @param forbiddenPrefixes List with configuration key prefixes which are not allowed. All keys which start with one of
* these prefixes will be ignored.
*/
public AbstractConfiguration(Reconciliation reconciliation, Iterable<Map.Entry<String, Object>> jsonOptions, List<String> forbiddenPrefixes) {
options.addIterablePairs(jsonOptions);
filterForbidden(reconciliation, forbiddenPrefixes);
}

/**
* Constructor used to instantiate this class from JsonObject. Should be used to create configuration from
* ConfigMap / CRD.
*
* @param reconciliation The reconciliation
* @param jsonOptions Json object with configuration options as key ad value pairs.
* @param forbiddenPrefixes List with configuration key prefixes which are not allowed. All keys which start with one of
* these prefixes will be ignored.
* @param forbiddenPrefixExceptions Exceptions excluded from forbidden prefix options checking
*/
public AbstractConfiguration(Reconciliation reconciliation, Iterable<Map.Entry<String, Object>> jsonOptions, List<String> forbiddenPrefixes, List<String> forbiddenPrefixExceptions) {
options.addIterablePairs(jsonOptions);
filterForbidden(reconciliation, forbiddenPrefixes, forbiddenPrefixExceptions);
}

/**
* Constructor used to instantiate this class from JsonObject. Should be used to create configuration from
* ConfigMap / CRD.
*
* @param reconciliation The reconciliation
* @param jsonOptions Json object with configuration options as key ad value pairs.
* @param forbiddenPrefixes List with configuration key prefixes which are not allowed. All keys which start with one of
* these prefixes will be ignored.
* @param defaults Properties object with default options
* @param reconciliation The reconciliation
* @param jsonOptions Json object with configuration options as key ad value pairs.
* @param forbiddenPrefixes List with configuration key prefixes which are not allowed. All keys which
* start with one of these prefixes will be ignored.
* @param forbiddenPrefixExceptions Exceptions excluded from forbidden prefix options checking
* @param forbiddenOptions List with the exact configuration options (not prefixes) that should not be used
* @param defaults Properties object with default options
*/
public AbstractConfiguration(Reconciliation reconciliation, Iterable<Map.Entry<String, Object>> jsonOptions, List<String> forbiddenPrefixes, Map<String, String> defaults) {
public AbstractConfiguration(Reconciliation reconciliation, Iterable<Map.Entry<String, Object>> jsonOptions, List<String> forbiddenPrefixes, List<String> forbiddenPrefixExceptions, List<String> forbiddenOptions, Map<String, String> defaults) {
options.addMapPairs(defaults);
options.addIterablePairs(jsonOptions);
filterForbidden(reconciliation, forbiddenPrefixes);
filterForbidden(reconciliation, forbiddenPrefixes, forbiddenPrefixExceptions, forbiddenOptions);
}

/**
* Constructor used to instantiate this class from JsonObject. Should be used to create configuration from
* ConfigMap / CRD.
*
* @param reconciliation The reconciliation
* @param jsonOptions Json object with configuration options as key ad value pairs.
* @param forbiddenPrefixes List with configuration key prefixes which are not allowed. All keys which start with one of
* these prefixes will be ignored.
* @param reconciliation The reconciliation
* @param configuration Configuration in String format. Should contain zero or more lines with
* key=value pairs.
* @param forbiddenPrefixes List with configuration key prefixes which are not allowed. All keys which
* start with one of these prefixes will be ignored.
* @param forbiddenPrefixExceptions Exceptions excluded from forbidden prefix options checking
* @param defaults Properties object with default options
* @param forbiddenOptions List with the exact configuration options (not prefixes) that should not be used
* @param defaults Properties object with default options
*/
public AbstractConfiguration(Reconciliation reconciliation, Iterable<Map.Entry<String, Object>> jsonOptions, List<String> forbiddenPrefixes, List<String> forbiddenPrefixExceptions, Map<String, String> defaults) {
public AbstractConfiguration(Reconciliation reconciliation, String configuration, List<String> forbiddenPrefixes, List<String> forbiddenPrefixExceptions, List<String> forbiddenOptions, Map<String, String> defaults) {
options.addMapPairs(defaults);
options.addIterablePairs(jsonOptions);
filterForbidden(reconciliation, forbiddenPrefixes, forbiddenPrefixExceptions);
options.addStringPairs(configuration);
filterForbidden(reconciliation, forbiddenPrefixes, forbiddenPrefixExceptions, forbiddenOptions);
}

/**
* Filters forbidden values from the configuration.
*
* @param reconciliation The reconciliation
* @param forbiddenPrefixes List with configuration key prefixes which are not allowed. All keys which start with one of
* these prefixes will be ignored.
* @param reconciliation The reconciliation
* @param forbiddenPrefixes List with configuration key prefixes which are not allowed. All keys which start
* with one of these prefixes will be ignored.
* @param forbiddenPrefixExceptions Exceptions excluded from forbidden prefix options checking
* @param forbiddenOptions List with the exact configuration options (not prefixes) that should not be used
*/
private void filterForbidden(Reconciliation reconciliation, List<String> forbiddenPrefixes, List<String> forbiddenPrefixExceptions) {
private void filterForbidden(Reconciliation reconciliation, List<String> forbiddenPrefixes, List<String> forbiddenPrefixExceptions, List<String> forbiddenOptions) {
// We filter the prefixes first
options.filter(k -> forbiddenPrefixes.stream().anyMatch(s -> {
boolean forbidden = k.toLowerCase(Locale.ENGLISH).startsWith(s);

if (forbidden) {
if (forbiddenPrefixExceptions.contains(k))
forbidden = false;
}

if (forbidden) {
LOGGER.warnCr(reconciliation, "Configuration option \"{}\" is forbidden and will be ignored", k);
} else {
LOGGER.traceCr(reconciliation, "Configuration option \"{}\" is allowed and will be passed to the assembly", k);
}

return forbidden;
}));
}

private void filterForbidden(Reconciliation reconciliation, List<String> forbiddenPrefixes) {
this.filterForbidden(reconciliation, forbiddenPrefixes, Collections.emptyList());
// Then we filter the forbidden options
if (!forbiddenOptions.isEmpty()) {
options.filter(k -> forbiddenOptions.stream().anyMatch(s -> {
boolean forbidden = k.toLowerCase(Locale.ENGLISH).equals(s);

if (forbidden) {
LOGGER.warnCr(reconciliation, "Configuration option \"{}\" is forbidden and will be ignored", k);
} else {
LOGGER.traceCr(reconciliation, "Configuration option \"{}\" is allowed and will be passed to the assembly", k);
}

return forbidden;
}));
}
}

/**
Expand Down Expand Up @@ -208,8 +166,9 @@ public String getConfiguration() {
}

/**
* Get access to underlying key-value pairs. Any changes to OrderedProperties will be reflected in
* in value returned by subsequent calls to getConfiguration()
* Get access to underlying key-value pairs. Any changes to OrderedProperties will be reflected in value returned
* by subsequent calls to getConfiguration()
*
* @return A map of keys to values.
*/
public OrderedProperties asOrderedProperties() {
Expand All @@ -222,7 +181,7 @@ public OrderedProperties asOrderedProperties() {
* @param prefixes String with comma-separated items
* @return List with the values as separate items
*/
protected static List<String> splitPrefixesToList(String prefixes) {
protected static List<String> splitPrefixesOrOptionsToList(String prefixes) {
return asList(prefixes.split("\\s*,+\\s*"));
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -22,8 +22,8 @@ public class KafkaBridgeAdminClientConfiguration extends AbstractConfiguration {
private static final Map<String, String> DEFAULTS;

static {
FORBIDDEN_PREFIXES = AbstractConfiguration.splitPrefixesToList(KafkaBridgeAdminClientSpec.FORBIDDEN_PREFIXES);
FORBIDDEN_PREFIX_EXCEPTIONS = AbstractConfiguration.splitPrefixesToList(KafkaBridgeAdminClientSpec.FORBIDDEN_PREFIX_EXCEPTIONS);
FORBIDDEN_PREFIXES = AbstractConfiguration.splitPrefixesOrOptionsToList(KafkaBridgeAdminClientSpec.FORBIDDEN_PREFIXES);
FORBIDDEN_PREFIX_EXCEPTIONS = AbstractConfiguration.splitPrefixesOrOptionsToList(KafkaBridgeAdminClientSpec.FORBIDDEN_PREFIX_EXCEPTIONS);
DEFAULTS = new HashMap<>(0);
}

Expand All @@ -35,6 +35,6 @@ public class KafkaBridgeAdminClientConfiguration extends AbstractConfiguration {
* @param jsonOptions Json object with configuration options as key ad value pairs.
*/
public KafkaBridgeAdminClientConfiguration(Reconciliation reconciliation, Iterable<Map.Entry<String, Object>> jsonOptions) {
super(reconciliation, jsonOptions, FORBIDDEN_PREFIXES, FORBIDDEN_PREFIX_EXCEPTIONS, DEFAULTS);
super(reconciliation, jsonOptions, FORBIDDEN_PREFIXES, FORBIDDEN_PREFIX_EXCEPTIONS, List.of(), DEFAULTS);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -22,8 +22,8 @@ public class KafkaBridgeConsumerConfiguration extends AbstractConfiguration {
private static final Map<String, String> DEFAULTS;

static {
FORBIDDEN_PREFIXES = AbstractConfiguration.splitPrefixesToList(KafkaBridgeConsumerSpec.FORBIDDEN_PREFIXES);
FORBIDDEN_PREFIX_EXCEPTIONS = AbstractConfiguration.splitPrefixesToList(KafkaBridgeConsumerSpec.FORBIDDEN_PREFIX_EXCEPTIONS);
FORBIDDEN_PREFIXES = AbstractConfiguration.splitPrefixesOrOptionsToList(KafkaBridgeConsumerSpec.FORBIDDEN_PREFIXES);
FORBIDDEN_PREFIX_EXCEPTIONS = AbstractConfiguration.splitPrefixesOrOptionsToList(KafkaBridgeConsumerSpec.FORBIDDEN_PREFIX_EXCEPTIONS);
DEFAULTS = new HashMap<>(0);
}

Expand All @@ -35,6 +35,6 @@ public class KafkaBridgeConsumerConfiguration extends AbstractConfiguration {
* @param jsonOptions Json object with configuration options as key ad value pairs.
*/
public KafkaBridgeConsumerConfiguration(Reconciliation reconciliation, Iterable<Map.Entry<String, Object>> jsonOptions) {
super(reconciliation, jsonOptions, FORBIDDEN_PREFIXES, FORBIDDEN_PREFIX_EXCEPTIONS, DEFAULTS);
super(reconciliation, jsonOptions, FORBIDDEN_PREFIXES, FORBIDDEN_PREFIX_EXCEPTIONS, List.of(), DEFAULTS);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -22,8 +22,8 @@ public class KafkaBridgeProducerConfiguration extends AbstractConfiguration {
private static final Map<String, String> DEFAULTS;

static {
FORBIDDEN_PREFIXES = AbstractConfiguration.splitPrefixesToList(KafkaBridgeProducerSpec.FORBIDDEN_PREFIXES);
FORBIDDEN_PREFIX_EXCEPTIONS = AbstractConfiguration.splitPrefixesToList(KafkaBridgeProducerSpec.FORBIDDEN_PREFIX_EXCEPTIONS);
FORBIDDEN_PREFIXES = AbstractConfiguration.splitPrefixesOrOptionsToList(KafkaBridgeProducerSpec.FORBIDDEN_PREFIXES);
FORBIDDEN_PREFIX_EXCEPTIONS = AbstractConfiguration.splitPrefixesOrOptionsToList(KafkaBridgeProducerSpec.FORBIDDEN_PREFIX_EXCEPTIONS);
DEFAULTS = new HashMap<>(0);
}

Expand All @@ -35,6 +35,6 @@ public class KafkaBridgeProducerConfiguration extends AbstractConfiguration {
* @param jsonOptions Json object with configuration options as key ad value pairs.
*/
public KafkaBridgeProducerConfiguration(Reconciliation reconciliation, Iterable<Map.Entry<String, Object>> jsonOptions) {
super(reconciliation, jsonOptions, FORBIDDEN_PREFIXES, FORBIDDEN_PREFIX_EXCEPTIONS, DEFAULTS);
super(reconciliation, jsonOptions, FORBIDDEN_PREFIXES, FORBIDDEN_PREFIX_EXCEPTIONS, List.of(), DEFAULTS);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -49,8 +49,8 @@ public class KafkaConfiguration extends AbstractConfiguration {
private static final List<String> FORBIDDEN_PREFIX_EXCEPTIONS;

static {
FORBIDDEN_PREFIXES = AbstractConfiguration.splitPrefixesToList(KafkaClusterSpec.FORBIDDEN_PREFIXES);
FORBIDDEN_PREFIX_EXCEPTIONS = AbstractConfiguration.splitPrefixesToList(KafkaClusterSpec.FORBIDDEN_PREFIX_EXCEPTIONS);
FORBIDDEN_PREFIXES = AbstractConfiguration.splitPrefixesOrOptionsToList(KafkaClusterSpec.FORBIDDEN_PREFIXES);
FORBIDDEN_PREFIX_EXCEPTIONS = AbstractConfiguration.splitPrefixesOrOptionsToList(KafkaClusterSpec.FORBIDDEN_PREFIX_EXCEPTIONS);
}

/**
Expand Down Expand Up @@ -212,11 +212,11 @@ public KafkaConfiguration(KafkaConfiguration configuration) {
* @param jsonOptions Json object with configuration options as key ad value pairs.
*/
public KafkaConfiguration(Reconciliation reconciliation, Iterable<Map.Entry<String, Object>> jsonOptions) {
super(reconciliation, jsonOptions, FORBIDDEN_PREFIXES, FORBIDDEN_PREFIX_EXCEPTIONS);
super(reconciliation, jsonOptions, FORBIDDEN_PREFIXES, FORBIDDEN_PREFIX_EXCEPTIONS, List.of(), Map.of());
}

private KafkaConfiguration(Reconciliation reconciliation, String configuration, List<String> forbiddenPrefixes) {
super(reconciliation, configuration, forbiddenPrefixes);
super(reconciliation, configuration, forbiddenPrefixes, List.of(), List.of(), Map.of());
}


Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,8 +21,8 @@ public class KafkaConnectConfiguration extends AbstractConfiguration {
private static final Map<String, String> DEFAULTS;

static {
FORBIDDEN_PREFIXES = AbstractConfiguration.splitPrefixesToList(KafkaConnectSpec.FORBIDDEN_PREFIXES);
FORBIDDEN_PREFIX_EXCEPTIONS = AbstractConfiguration.splitPrefixesToList(KafkaConnectSpec.FORBIDDEN_PREFIX_EXCEPTIONS);
FORBIDDEN_PREFIXES = AbstractConfiguration.splitPrefixesOrOptionsToList(KafkaConnectSpec.FORBIDDEN_PREFIXES);
FORBIDDEN_PREFIX_EXCEPTIONS = AbstractConfiguration.splitPrefixesOrOptionsToList(KafkaConnectSpec.FORBIDDEN_PREFIX_EXCEPTIONS);

DEFAULTS = new HashMap<>(6);
DEFAULTS.put("group.id", "connect-cluster");
Expand All @@ -41,6 +41,6 @@ public class KafkaConnectConfiguration extends AbstractConfiguration {
* @param jsonOptions Json object with configuration options as key ad value pairs.
*/
public KafkaConnectConfiguration(Reconciliation reconciliation, Iterable<Map.Entry<String, Object>> jsonOptions) {
super(reconciliation, jsonOptions, FORBIDDEN_PREFIXES, FORBIDDEN_PREFIX_EXCEPTIONS, DEFAULTS);
super(reconciliation, jsonOptions, FORBIDDEN_PREFIXES, FORBIDDEN_PREFIX_EXCEPTIONS, List.of(), DEFAULTS);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -8,22 +8,17 @@
import io.strimzi.api.kafka.model.connector.AbstractConnectorSpec;
import io.strimzi.operator.common.Reconciliation;

import java.util.HashMap;
import java.util.List;
import java.util.Map;

/**
* Class for handling KafkaConnector configuration passed by the user
*/
public class KafkaConnectorConfiguration extends AbstractConfiguration {
private static final List<String> FORBIDDEN_PREFIXES;
private static final List<String> FORBIDDEN_PREFIX_EXCEPTIONS;
private static final Map<String, String> DEFAULTS;
private static final List<String> FORBIDDEN_OPTIONS;

static {
FORBIDDEN_PREFIXES = AbstractConfiguration.splitPrefixesToList(AbstractConnectorSpec.FORBIDDEN_PARAMETERS);
FORBIDDEN_PREFIX_EXCEPTIONS = List.of();
DEFAULTS = new HashMap<>(0);
FORBIDDEN_OPTIONS = AbstractConfiguration.splitPrefixesOrOptionsToList(AbstractConnectorSpec.FORBIDDEN_PARAMETERS);
}

/**
Expand All @@ -34,6 +29,6 @@ public class KafkaConnectorConfiguration extends AbstractConfiguration {
* @param jsonOptions Json object with configuration options as key ad value pairs.
*/
public KafkaConnectorConfiguration(Reconciliation reconciliation, Iterable<Map.Entry<String, Object>> jsonOptions) {
super(reconciliation, jsonOptions, FORBIDDEN_PREFIXES, FORBIDDEN_PREFIX_EXCEPTIONS, DEFAULTS);
super(reconciliation, jsonOptions, List.of(), List.of(), FORBIDDEN_OPTIONS, Map.of());
}
}
Loading
Loading