forked from apache/kafka
-
Notifications
You must be signed in to change notification settings - Fork 0
Commit
This commit does not belong to any branch on this repository, and may belong to a fork outside of the repository.
KAFKA-4667: Connect uses AdminClient to create internal topics when n…
…eeded (KIP-154) The backing store for offsets, status, and configs now attempts to use the new AdminClient to look up the internal topics and create them if they don’t yet exist. If the necessary APIs are not available in the connected broker, the stores fall back to the old behavior of relying upon auto-created topics. Kafka Connect requires a minimum of Apache Kafka 0.10.0.1-cp1, and the AdminClient can work with all versions since 0.10.0.0. All three of Connect’s internal topics are created as compacted topics, and new distributed worker configuration properties control the replication factor for all three topics and the number of partitions for the offsets and status topics; the config topic requires a single partition and does not allow it to be set via configuration. All of these new configuration properties have sensible defaults, meaning users can upgrade without having to change any of the existing configurations. In most situations, existing Connect deployments will have already created the storage topics prior to upgrading. The replication factor defaults to 3, so anyone running Kafka clusters with fewer nodes than 3 will receive an error unless they explicitly set the replication factor for the three internal topics. This is actually desired behavior, since it signals the users that they should be aware they are not using sufficient replication for production use. The integration tests use a cluster with a single broker, so they were changed to explicitly specify a replication factor of 1 and a single partition. The `KafkaAdminClientTest` was refactored to extract a utility for setting up a `KafkaAdminClient` with a `MockClient` for unit tests. Author: Randall Hauch <rhauch@gmail.com> Reviewers: Ewen Cheslack-Postava <ewen@confluent.io> Closes apache#2984 from rhauch/kafka-4667
- Loading branch information
Showing
18 changed files
with
828 additions
and
123 deletions.
There are no files selected for viewing
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
151 changes: 62 additions & 89 deletions
151
clients/src/test/java/org/apache/kafka/clients/admin/KafkaAdminClientTest.java
Large diffs are not rendered by default.
Oops, something went wrong.
87 changes: 87 additions & 0 deletions
87
clients/src/test/java/org/apache/kafka/clients/admin/MockKafkaAdminClientEnv.java
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,87 @@ | ||
/* | ||
* Licensed to the Apache Software Foundation (ASF) under one or more | ||
* contributor license agreements. See the NOTICE file distributed with | ||
* this work for additional information regarding copyright ownership. | ||
* The ASF licenses this file to You under the Apache License, Version 2.0 | ||
* (the "License"); you may not use this file except in compliance with | ||
* the License. You may obtain a copy of the License at | ||
* | ||
* http://www.apache.org/licenses/LICENSE-2.0 | ||
* | ||
* Unless required by applicable law or agreed to in writing, software | ||
* distributed under the License is distributed on an "AS IS" BASIS, | ||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. | ||
* See the License for the specific language governing permissions and | ||
* limitations under the License. | ||
*/ | ||
package org.apache.kafka.clients.admin; | ||
|
||
import org.apache.kafka.clients.Metadata; | ||
import org.apache.kafka.clients.MockClient; | ||
import org.apache.kafka.common.Cluster; | ||
import org.apache.kafka.common.utils.Time; | ||
|
||
import java.util.HashMap; | ||
import java.util.Map; | ||
|
||
/** | ||
* Simple utility for setting up a mock {@link KafkaAdminClient} that uses a {@link MockClient} for a supplied | ||
* {@link Cluster}. Create a {@link Cluster} manually or use {@link org.apache.kafka.test.TestUtils} methods to | ||
* easily create a simple cluster. | ||
* <p> | ||
* To use in a test, create an instance and prepare its {@link #kafkaClient() MockClient} with the expected responses | ||
* for the {@link AdminClient}. Then, use the {@link #adminClient() AdminClient} in the test, which will then use the MockClient | ||
* and receive the responses you provided. | ||
* <p> | ||
* When finished, be sure to {@link #close() close} the environment object. | ||
*/ | ||
public class MockKafkaAdminClientEnv implements AutoCloseable { | ||
private final AdminClientConfig adminClientConfig; | ||
private final Metadata metadata; | ||
private final MockClient mockClient; | ||
private final KafkaAdminClient client; | ||
private final Cluster cluster; | ||
|
||
public MockKafkaAdminClientEnv(Cluster cluster, String...vals) { | ||
this(cluster, newStrMap(vals)); | ||
} | ||
|
||
public MockKafkaAdminClientEnv(Cluster cluster, Map<String, Object> config) { | ||
this.adminClientConfig = new AdminClientConfig(config); | ||
this.cluster = cluster; | ||
this.metadata = new Metadata(adminClientConfig.getLong(AdminClientConfig.RETRY_BACKOFF_MS_CONFIG), | ||
adminClientConfig.getLong(AdminClientConfig.METADATA_MAX_AGE_CONFIG)); | ||
this.mockClient = new MockClient(Time.SYSTEM, this.metadata); | ||
this.client = KafkaAdminClient.create(adminClientConfig, mockClient, metadata); | ||
} | ||
|
||
public Cluster cluster() { | ||
return cluster; | ||
} | ||
|
||
public AdminClient adminClient() { | ||
return client; | ||
} | ||
|
||
public MockClient kafkaClient() { | ||
return mockClient; | ||
} | ||
|
||
@Override | ||
public void close() { | ||
this.client.close(); | ||
} | ||
|
||
private static Map<String, Object> newStrMap(String... vals) { | ||
Map<String, Object> map = new HashMap<>(); | ||
map.put(AdminClientConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:8121"); | ||
map.put(AdminClientConfig.REQUEST_TIMEOUT_MS_CONFIG, "1000"); | ||
if (vals.length % 2 != 0) { | ||
throw new IllegalStateException(); | ||
} | ||
for (int i = 0; i < vals.length; i += 2) { | ||
map.put(vals[i], vals[i + 1]); | ||
} | ||
return map; | ||
} | ||
} |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Oops, something went wrong.