Skip to content

Commit

Permalink
KAFKA-4667: Connect uses AdminClient to create internal topics when n…
Browse files Browse the repository at this point in the history
…eeded (KIP-154)

The backing store for offsets, status, and configs now attempts to use the new AdminClient to look up the internal topics and create them if they don’t yet exist. If the necessary APIs are not available in the connected broker, the stores fall back to the old behavior of relying upon auto-created topics. Kafka Connect requires a minimum of Apache Kafka 0.10.0.1-cp1, and the AdminClient can work with all versions since 0.10.0.0.

All three of Connect’s internal topics are created as compacted topics, and new distributed worker configuration properties control the replication factor for all three topics and the number of partitions for the offsets and status topics; the config topic requires a single partition and does not allow it to be set via configuration. All of these new configuration properties have sensible defaults, meaning users can upgrade without having to change any of the existing configurations. In most situations, existing Connect deployments will have already created the storage topics prior to upgrading.

The replication factor defaults to 3, so anyone running Kafka clusters with fewer nodes than 3 will receive an error unless they explicitly set the replication factor for the three internal topics. This is actually desired behavior, since it signals the users that they should be aware they are not using sufficient replication for production use.

The integration tests use a cluster with a single broker, so they were changed to explicitly specify a replication factor of 1 and a single partition.

The `KafkaAdminClientTest` was refactored to extract a utility for setting up a `KafkaAdminClient` with a `MockClient` for unit tests.

Author: Randall Hauch <rhauch@gmail.com>

Reviewers: Ewen Cheslack-Postava <ewen@confluent.io>

Closes apache#2984 from rhauch/kafka-4667
  • Loading branch information
rhauch authored and ewencp committed May 18, 2017
1 parent 30736da commit 56623ef
Show file tree
Hide file tree
Showing 18 changed files with 828 additions and 123 deletions.
20 changes: 20 additions & 0 deletions clients/src/main/java/org/apache/kafka/clients/admin/NewTopic.java
Original file line number Diff line number Diff line change
Expand Up @@ -56,6 +56,14 @@ public String name() {
return name;
}

public int partitions() {
return numPartitions;
}

public short replicationFactor() {
return replicationFactor;
}

/**
* Set the configuration to use on the new topic.
*
Expand All @@ -82,4 +90,16 @@ TopicDetails convertToTopicDetails() {
}
}
}

@Override
public String toString() {
StringBuilder bld = new StringBuilder();
bld.append("(name=").append(name).
append(", numPartitions=").append(numPartitions).
append(", replicationFactor=").append(replicationFactor).
append(", replicasAssignments=").append(replicasAssignments).
append(", configs=").append(configs).
append(")");
return bld.toString();
}
}

Large diffs are not rendered by default.

Original file line number Diff line number Diff line change
@@ -0,0 +1,87 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.kafka.clients.admin;

import org.apache.kafka.clients.Metadata;
import org.apache.kafka.clients.MockClient;
import org.apache.kafka.common.Cluster;
import org.apache.kafka.common.utils.Time;

import java.util.HashMap;
import java.util.Map;

/**
* Simple utility for setting up a mock {@link KafkaAdminClient} that uses a {@link MockClient} for a supplied
* {@link Cluster}. Create a {@link Cluster} manually or use {@link org.apache.kafka.test.TestUtils} methods to
* easily create a simple cluster.
* <p>
* To use in a test, create an instance and prepare its {@link #kafkaClient() MockClient} with the expected responses
* for the {@link AdminClient}. Then, use the {@link #adminClient() AdminClient} in the test, which will then use the MockClient
* and receive the responses you provided.
* <p>
* When finished, be sure to {@link #close() close} the environment object.
*/
public class MockKafkaAdminClientEnv implements AutoCloseable {
private final AdminClientConfig adminClientConfig;
private final Metadata metadata;
private final MockClient mockClient;
private final KafkaAdminClient client;
private final Cluster cluster;

public MockKafkaAdminClientEnv(Cluster cluster, String...vals) {
this(cluster, newStrMap(vals));
}

public MockKafkaAdminClientEnv(Cluster cluster, Map<String, Object> config) {
this.adminClientConfig = new AdminClientConfig(config);
this.cluster = cluster;
this.metadata = new Metadata(adminClientConfig.getLong(AdminClientConfig.RETRY_BACKOFF_MS_CONFIG),
adminClientConfig.getLong(AdminClientConfig.METADATA_MAX_AGE_CONFIG));
this.mockClient = new MockClient(Time.SYSTEM, this.metadata);
this.client = KafkaAdminClient.create(adminClientConfig, mockClient, metadata);
}

public Cluster cluster() {
return cluster;
}

public AdminClient adminClient() {
return client;
}

public MockClient kafkaClient() {
return mockClient;
}

@Override
public void close() {
this.client.close();
}

private static Map<String, Object> newStrMap(String... vals) {
Map<String, Object> map = new HashMap<>();
map.put(AdminClientConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:8121");
map.put(AdminClientConfig.REQUEST_TIMEOUT_MS_CONFIG, "1000");
if (vals.length % 2 != 0) {
throw new IllegalStateException();
}
for (int i = 0; i < vals.length; i += 2) {
map.put(vals[i], vals[i + 1]);
}
return map;
}
}
9 changes: 9 additions & 0 deletions clients/src/test/java/org/apache/kafka/test/TestUtils.java
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,7 @@
import java.nio.file.Path;
import java.util.ArrayList;
import java.util.Collections;
import java.util.HashMap;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
Expand Down Expand Up @@ -69,6 +70,10 @@ public class TestUtils {
public static final Random RANDOM = new Random();
public static final long DEFAULT_MAX_WAIT_MS = 15000;

public static Cluster singletonCluster() {
return clusterWith(1);
}

public static Cluster singletonCluster(final Map<String, Integer> topicPartitionCounts) {
return clusterWith(1, topicPartitionCounts);
}
Expand All @@ -77,6 +82,10 @@ public static Cluster singletonCluster(final String topic, final int partitions)
return clusterWith(1, topic, partitions);
}

public static Cluster clusterWith(int nodes) {
return clusterWith(nodes, new HashMap<String, Integer>());
}

public static Cluster clusterWith(final int nodes, final Map<String, Integer> topicPartitionCounts) {
final Node[] ns = new Node[nodes];
for (int i = 0; i < nodes; i++)
Expand Down
37 changes: 30 additions & 7 deletions config/connect-distributed.properties
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,11 @@
# limitations under the License.
##

# These are defaults. This file just demonstrates how to override some settings.
# This file contains some of the configurations for the Kafka Connect distributed worker. This file is intended
# to be used with the examples, and some settings may differ from those used in a production system, especially
# the `bootstrap.servers` and those specifying replication factors.

# A list of host/port pairs to use for establishing the initial connection to the Kafka cluster.
bootstrap.servers=localhost:9092

# unique name for the cluster, used in forming the Connect cluster group. Note that this must not conflict with consumer group IDs
Expand All @@ -30,22 +34,41 @@ value.converter=org.apache.kafka.connect.json.JsonConverter
key.converter.schemas.enable=true
value.converter.schemas.enable=true

# The internal converter used for offsets and config data is configurable and must be specified, but most users will
# always want to use the built-in default. Offset and config data is never visible outside of Kafka Connect in this format.
# The internal converter used for offsets, config, and status data is configurable and must be specified, but most users will
# always want to use the built-in default. Offset, config, and status data is never visible outside of Kafka Connect in this format.
internal.key.converter=org.apache.kafka.connect.json.JsonConverter
internal.value.converter=org.apache.kafka.connect.json.JsonConverter
internal.key.converter.schemas.enable=false
internal.value.converter.schemas.enable=false

# Topic to use for storing offsets. This topic should have many partitions and be replicated.
# Topic to use for storing offsets. This topic should have many partitions and be replicated and compacted.
# Kafka Connect will attempt to create the topic automatically when needed, but you can always manually create
# the topic before starting Kafka Connect if a specific topic configuration is needed.
# Most users will want to use the built-in default replication factor of 3 or in some cases even specify a larger value.
# Since this means there must be at least as many brokers as the maximum replication factor used, we'd like to be able
# to run this example on a single-broker cluster and so here we instead set the replication factor to 1.
offset.storage.topic=connect-offsets
offset.storage.replication.factor=1
#offset.storage.partitions=25

# Topic to use for storing connector and task configurations; note that this should be a single partition, highly replicated topic.
# You may need to manually create the topic to ensure single partition for the config topic as auto created topics may have multiple partitions.
# Topic to use for storing connector and task configurations; note that this should be a single partition, highly replicated,
# and compacted topic. Kafka Connect will attempt to create the topic automatically when needed, but you can always manually create
# the topic before starting Kafka Connect if a specific topic configuration is needed.
# Most users will want to use the built-in default replication factor of 3 or in some cases even specify a larger value.
# Since this means there must be at least as many brokers as the maximum replication factor used, we'd like to be able
# to run this example on a single-broker cluster and so here we instead set the replication factor to 1.
config.storage.topic=connect-configs
config.storage.replication.factor=1

# Topic to use for storing statuses. This topic can have multiple partitions and should be replicated.
# Topic to use for storing statuses. This topic can have multiple partitions and should be replicated and compacted.
# Kafka Connect will attempt to create the topic automatically when needed, but you can always manually create
# the topic before starting Kafka Connect if a specific topic configuration is needed.
# Most users will want to use the built-in default replication factor of 3 or in some cases even specify a larger value.
# Since this means there must be at least as many brokers as the maximum replication factor used, we'd like to be able
# to run this example on a single-broker cluster and so here we instead set the replication factor to 1.
status.storage.topic=connect-status
status.storage.replication.factor=1
#status.storage.partitions=5

# Flush much faster than normal, which is useful for testing/debugging
offset.flush.interval.ms=10000
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -88,19 +88,49 @@ public class DistributedConfig extends WorkerConfig {
* <code>offset.storage.topic</code>
*/
public static final String OFFSET_STORAGE_TOPIC_CONFIG = "offset.storage.topic";
private static final String OFFSET_STORAGE_TOPIC_CONFIG_DOC = "kafka topic to store connector offsets in";
private static final String OFFSET_STORAGE_TOPIC_CONFIG_DOC = "The name of the Kafka topic where connector offsets are stored";

/**
* <code>offset.storage.partitions</code>
*/
public static final String OFFSET_STORAGE_PARTITIONS_CONFIG = "offset.storage.partitions";
private static final String OFFSET_STORAGE_PARTITIONS_CONFIG_DOC = "The number of partitions used when creating the offset storage topic";

/**
* <code>offset.storage.replication.factor</code>
*/
public static final String OFFSET_STORAGE_REPLICATION_FACTOR_CONFIG = "offset.storage.replication.factor";
private static final String OFFSET_STORAGE_REPLICATION_FACTOR_CONFIG_DOC = "Replication factor used when creating the offset storage topic";

/**
* <code>config.storage.topic</code>
*/
public static final String CONFIG_TOPIC_CONFIG = "config.storage.topic";
private static final String CONFIG_TOPIC_CONFIG_DOC = "kafka topic to store configs";
private static final String CONFIG_TOPIC_CONFIG_DOC = "The name of the Kafka topic where connector configurations are stored";

/**
* <code>config.storage.replication.factor</code>
*/
public static final String CONFIG_STORAGE_REPLICATION_FACTOR_CONFIG = "config.storage.replication.factor";
private static final String CONFIG_STORAGE_REPLICATION_FACTOR_CONFIG_DOC = "Replication factor used when creating the configuration storage topic";

/**
* <code>status.storage.topic</code>
*/
public static final String STATUS_STORAGE_TOPIC_CONFIG = "status.storage.topic";
public static final String STATUS_STORAGE_TOPIC_CONFIG_DOC = "kafka topic to track connector and task status";
public static final String STATUS_STORAGE_TOPIC_CONFIG_DOC = "The name of the Kafka topic where connector and task status are stored";

/**
* <code>status.storage.partitions</code>
*/
public static final String STATUS_STORAGE_PARTITIONS_CONFIG = "status.storage.partitions";
private static final String STATUS_STORAGE_PARTITIONS_CONFIG_DOC = "The number of partitions used when creating the status storage topic";

/**
* <code>status.storage.replication.factor</code>
*/
public static final String STATUS_STORAGE_REPLICATION_FACTOR_CONFIG = "status.storage.replication.factor";
private static final String STATUS_STORAGE_REPLICATION_FACTOR_CONFIG_DOC = "Replication factor used when creating the status storage topic";

static {
CONFIG = baseConfigDef()
Expand Down Expand Up @@ -209,14 +239,44 @@ public class DistributedConfig extends WorkerConfig {
ConfigDef.Type.STRING,
ConfigDef.Importance.HIGH,
OFFSET_STORAGE_TOPIC_CONFIG_DOC)
.define(OFFSET_STORAGE_PARTITIONS_CONFIG,
ConfigDef.Type.INT,
25,
atLeast(1),
ConfigDef.Importance.LOW,
OFFSET_STORAGE_PARTITIONS_CONFIG_DOC)
.define(OFFSET_STORAGE_REPLICATION_FACTOR_CONFIG,
ConfigDef.Type.SHORT,
(short) 3,
atLeast(1),
ConfigDef.Importance.LOW,
OFFSET_STORAGE_REPLICATION_FACTOR_CONFIG_DOC)
.define(CONFIG_TOPIC_CONFIG,
ConfigDef.Type.STRING,
ConfigDef.Importance.HIGH,
CONFIG_TOPIC_CONFIG_DOC)
.define(CONFIG_STORAGE_REPLICATION_FACTOR_CONFIG,
ConfigDef.Type.SHORT,
(short) 3,
atLeast(1),
ConfigDef.Importance.LOW,
CONFIG_STORAGE_REPLICATION_FACTOR_CONFIG_DOC)
.define(STATUS_STORAGE_TOPIC_CONFIG,
ConfigDef.Type.STRING,
ConfigDef.Importance.HIGH,
STATUS_STORAGE_TOPIC_CONFIG_DOC);
STATUS_STORAGE_TOPIC_CONFIG_DOC)
.define(STATUS_STORAGE_PARTITIONS_CONFIG,
ConfigDef.Type.INT,
5,
atLeast(1),
ConfigDef.Importance.LOW,
STATUS_STORAGE_PARTITIONS_CONFIG_DOC)
.define(STATUS_STORAGE_REPLICATION_FACTOR_CONFIG,
ConfigDef.Type.SHORT,
(short) 3,
atLeast(1),
ConfigDef.Importance.LOW,
STATUS_STORAGE_REPLICATION_FACTOR_CONFIG_DOC);
}

public DistributedConfig(Map<String, String> props) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@
*/
package org.apache.kafka.connect.storage;

import org.apache.kafka.clients.admin.NewTopic;
import org.apache.kafka.clients.consumer.ConsumerConfig;
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.apache.kafka.clients.producer.ProducerConfig;
Expand All @@ -39,6 +40,7 @@
import org.apache.kafka.connect.util.Callback;
import org.apache.kafka.connect.util.ConnectorTaskId;
import org.apache.kafka.connect.util.KafkaBasedLog;
import org.apache.kafka.connect.util.TopicAdmin;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

Expand Down Expand Up @@ -403,7 +405,7 @@ public void putTargetState(String connector, TargetState state) {
}

// package private for testing
KafkaBasedLog<String, byte[]> setupAndCreateKafkaBasedLog(String topic, WorkerConfig config) {
KafkaBasedLog<String, byte[]> setupAndCreateKafkaBasedLog(String topic, final WorkerConfig config) {
Map<String, Object> producerProps = new HashMap<>();
producerProps.putAll(config.originals());
producerProps.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName());
Expand All @@ -415,12 +417,29 @@ KafkaBasedLog<String, byte[]> setupAndCreateKafkaBasedLog(String topic, WorkerCo
consumerProps.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName());
consumerProps.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, ByteArrayDeserializer.class.getName());

return createKafkaBasedLog(topic, producerProps, consumerProps, new ConsumeCallback());
Map<String, Object> adminProps = new HashMap<>(config.originals());
NewTopic topicDescription = TopicAdmin.defineTopic(topic).
compacted().
partitions(1).
replicationFactor(config.getShort(DistributedConfig.CONFIG_STORAGE_REPLICATION_FACTOR_CONFIG)).
build();

return createKafkaBasedLog(topic, producerProps, consumerProps, new ConsumeCallback(), topicDescription, adminProps);
}

private KafkaBasedLog<String, byte[]> createKafkaBasedLog(String topic, Map<String, Object> producerProps,
Map<String, Object> consumerProps, Callback<ConsumerRecord<String, byte[]>> consumedCallback) {
return new KafkaBasedLog<>(topic, producerProps, consumerProps, consumedCallback, Time.SYSTEM);
Map<String, Object> consumerProps,
Callback<ConsumerRecord<String, byte[]>> consumedCallback,
final NewTopic topicDescription, final Map<String, Object> adminProps) {
Runnable createTopics = new Runnable() {
@Override
public void run() {
try (TopicAdmin admin = new TopicAdmin(adminProps)) {
admin.createTopics(topicDescription);
}
}
};
return new KafkaBasedLog<>(topic, producerProps, consumerProps, consumedCallback, Time.SYSTEM, createTopics);
}

@SuppressWarnings("unchecked")
Expand Down
Loading

0 comments on commit 56623ef

Please sign in to comment.