Skip to content

Commit

Permalink
KAFKA-2371: Add distributed support for Copycat.
Browse files Browse the repository at this point in the history
This adds coordination between DistributedHerders using the generalized consumer
support, allowing automatic balancing of connectors and tasks across workers. A
few pieces that require interaction between workers (resolving config
inconsistencies, forwarding of configuration changes to the leader worker) are
incomplete because they require REST API support to implement properly.

Author: Ewen Cheslack-Postava <me@ewencp.org>

Reviewers: Jason Gustafson, Gwen Shapira

Closes apache#321 from ewencp/kafka-2371-distributed-herder
  • Loading branch information
ewencp authored and gwenshap committed Oct 23, 2015
1 parent 21443f2 commit 2e61773
Show file tree
Hide file tree
Showing 34 changed files with 2,966 additions and 696 deletions.
1 change: 1 addition & 0 deletions build.gradle
Original file line number Diff line number Diff line change
Expand Up @@ -754,6 +754,7 @@ project(':copycat:runtime') {
testCompile "$easymock"
testCompile "$powermock"
testCompile "$powermock_easymock"
testCompile project(':clients').sourceSets.test.output
testRuntime "$slf4jlog4j"
testRuntime project(":copycat:json")
}
Expand Down
1 change: 1 addition & 0 deletions checkstyle/import-control.xml
Original file line number Diff line number Diff line change
Expand Up @@ -146,6 +146,7 @@
<allow pkg="org.apache.kafka.copycat.data" />
<allow pkg="org.apache.kafka.copycat.errors" />
<allow pkg="org.apache.kafka.clients" />
<allow pkg="org.apache.kafka.test"/>

<subpackage name="source">
<allow pkg="org.apache.kafka.copycat.connector" />
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -14,11 +14,11 @@

import org.apache.kafka.clients.consumer.internals.AbstractPartitionAssignor;
import org.apache.kafka.common.TopicPartition;
import org.apache.kafka.common.utils.CircularIterator;
import org.apache.kafka.common.utils.Utils;

import java.util.ArrayList;
import java.util.HashMap;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
import java.util.SortedSet;
Expand Down Expand Up @@ -78,37 +78,4 @@ public String name() {
return "roundrobin";
}

private static class CircularIterator<T> implements Iterator<T> {
int i = 0;
private List<T> list;

public CircularIterator(List<T> list) {
if (list.isEmpty()) {
throw new IllegalArgumentException("CircularIterator can only be used on non-empty lists");
}
this.list = list;
}

@Override
public boolean hasNext() {
return true;
}

@Override
public T next() {
T next = list.get(i);
i = (i + 1) % list.size();
return next;
}

public T peek() {
return list.get(i);
}

@Override
public void remove() {
throw new UnsupportedOperationException();
}
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -199,7 +199,7 @@ public void resetGeneration() {
this.memberId = JoinGroupRequest.UNKNOWN_MEMBER_ID;
rejoinNeeded = true;
}

private boolean needsOnLeave = true;
/**
* Ensure that the group is active (i.e. joined and synced)
*/
Expand All @@ -208,7 +208,10 @@ public void ensureActiveGroup() {
return;

// onLeave only invoked if we have a valid current generation
onLeave(generation, memberId);
if (needsOnLeave) {
onLeave(generation, memberId);
needsOnLeave = false;
}

while (needRejoin()) {
ensureCoordinatorKnown();
Expand All @@ -225,6 +228,7 @@ public void ensureActiveGroup() {

if (future.succeeded()) {
onJoin(generation, memberId, protocol, future.value());
needsOnLeave = true;
heartbeatTask.reset();
} else {
if (future.exception() instanceof UnknownMemberIdException)
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,54 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
**/

package org.apache.kafka.common.utils;

import java.util.Iterator;
import java.util.List;

public class CircularIterator<T> implements Iterator<T> {
int i = 0;
private List<T> list;

public CircularIterator(List<T> list) {
if (list.isEmpty()) {
throw new IllegalArgumentException("CircularIterator can only be used on non-empty lists");
}
this.list = list;
}

@Override
public boolean hasNext() {
return true;
}

@Override
public T next() {
T next = list.get(i);
i = (i + 1) % list.size();
return next;
}

public T peek() {
return list.get(i);
}

@Override
public void remove() {
throw new UnsupportedOperationException();
}
}
2 changes: 2 additions & 0 deletions config/copycat-distributed.properties
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,8 @@
# These are defaults. This file just demonstrates how to override some settings.
bootstrap.servers=localhost:9092

group.id=copycat-cluster

# The converters specify the format of data in Kafka and how to translate it into Copycat data. Every Copycat user will
# need to configure these based on the format they want their data in when loaded from or stored into Kafka
key.converter=org.apache.kafka.copycat.json.JsonConverter
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -38,19 +38,21 @@
public class FileStreamSinkTask extends SinkTask {
private static final Logger log = LoggerFactory.getLogger(FileStreamSinkTask.class);

private String filename;
private PrintStream outputStream;

public FileStreamSinkTask() {
}

// for testing
public FileStreamSinkTask(PrintStream outputStream) {
filename = null;
this.outputStream = outputStream;
}

@Override
public void start(Properties props) {
String filename = props.getProperty(FileStreamSinkConnector.FILE_CONFIG);
filename = props.getProperty(FileStreamSinkConnector.FILE_CONFIG);
if (filename == null) {
outputStream = System.out;
} else {
Expand All @@ -65,16 +67,24 @@ public void start(Properties props) {
@Override
public void put(Collection<SinkRecord> sinkRecords) {
for (SinkRecord record : sinkRecords) {
log.trace("Writing line to {}: {}", logFilename(), record.value());
outputStream.println(record.value());
}
}

@Override
public void flush(Map<TopicPartition, OffsetAndMetadata> offsets) {
log.trace("Flushing output stream for {}", logFilename());
outputStream.flush();
}

@Override
public void stop() {
if (outputStream != System.out)
outputStream.close();
}

private String logFilename() {
return filename == null ? "stdout" : filename;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -56,7 +56,7 @@ public void start(Properties props) {
}
topic = props.getProperty(FileStreamSourceConnector.TOPIC_CONFIG);
if (topic == null)
throw new CopycatException("ConsoleSourceTask config missing topic setting");
throw new CopycatException("FileStreamSourceTask config missing topic setting");
}

@Override
Expand Down Expand Up @@ -88,6 +88,7 @@ public List<SourceRecord> poll() throws InterruptedException {
streamOffset = 0L;
}
reader = new BufferedReader(new InputStreamReader(stream));
log.debug("Opened {} for reading", logFilename());
} catch (FileNotFoundException e) {
log.warn("Couldn't find file for FileStreamSourceTask, sleeping to wait for it to be created");
synchronized (this) {
Expand All @@ -113,6 +114,7 @@ public List<SourceRecord> poll() throws InterruptedException {
int nread = 0;
while (readerCopy.ready()) {
nread = readerCopy.read(buffer, offset, buffer.length - offset);
log.trace("Read {} bytes from {}", nread, logFilename());

if (nread > 0) {
offset += nread;
Expand All @@ -126,6 +128,7 @@ public List<SourceRecord> poll() throws InterruptedException {
do {
line = extractLine();
if (line != null) {
log.trace("Read a line from {}", logFilename());
if (records == null)
records = new ArrayList<>();
records.add(new SourceRecord(offsetKey(filename), offsetValue(streamOffset), topic, VALUE_SCHEMA, line));
Expand Down Expand Up @@ -183,10 +186,12 @@ public void stop() {
log.trace("Stopping");
synchronized (this) {
try {
stream.close();
log.trace("Closed input stream");
if (stream != null && stream != System.in) {
stream.close();
log.trace("Closed input stream");
}
} catch (IOException e) {
log.error("Failed to close ConsoleSourceTask stream: ", e);
log.error("Failed to close FileStreamSourceTask stream: ", e);
}
this.notify();
}
Expand All @@ -199,4 +204,8 @@ private Map<String, String> offsetKey(String filename) {
private Map<String, Long> offsetValue(Long pos) {
return Collections.singletonMap(POSITION_FIELD, pos);
}

private String logFilename() {
return filename == null ? "stdin" : filename;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -48,8 +48,8 @@ public static void main(String[] args) throws Exception {
Properties workerProps;
Properties connectorProps;

if (args.length < 2) {
log.info("Usage: CopycatDistributed worker.properties connector1.properties [connector2.properties ...]");
if (args.length < 1) {
log.info("Usage: CopycatDistributed worker.properties [connector1.properties connector2.properties ...]");
System.exit(1);
}

Expand All @@ -58,8 +58,7 @@ public static void main(String[] args) throws Exception {

WorkerConfig workerConfig = new WorkerConfig(workerProps);
Worker worker = new Worker(workerConfig, new KafkaOffsetBackingStore());
DistributedHerder herder = new DistributedHerder(worker);
herder.configure(workerConfig.originals());
DistributedHerder herder = new DistributedHerder(worker, workerConfig.originals());
final Copycat copycat = new Copycat(worker, herder);
copycat.start();

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -59,7 +59,7 @@ public class ConnectorConfig extends AbstractConfig {
static {
config = new ConfigDef()
.define(NAME_CONFIG, Type.STRING, Importance.HIGH, NAME_DOC)
.define(CONNECTOR_CLASS_CONFIG, Type.STRING, Importance.HIGH, CONNECTOR_CLASS_DOC)
.define(CONNECTOR_CLASS_CONFIG, Type.CLASS, Importance.HIGH, CONNECTOR_CLASS_DOC)
.define(TASKS_MAX_CONFIG, Type.INT, TASKS_MAX_DEFAULT, Importance.HIGH, TASKS_MAX_DOC)
.define(TOPICS_CONFIG, Type.LIST, TOPICS_DEFAULT, Importance.HIGH, TOPICS_DOC);
}
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,54 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
**/

package org.apache.kafka.copycat.runtime;

import org.apache.kafka.common.config.AbstractConfig;
import org.apache.kafka.common.config.ConfigDef;
import org.apache.kafka.common.config.ConfigDef.Importance;
import org.apache.kafka.common.config.ConfigDef.Type;

import java.util.HashMap;
import java.util.Map;

/**
* <p>
* Configuration options for Tasks. These only include Copycat system-level configuration
* options.
* </p>
*/
public class TaskConfig extends AbstractConfig {

public static final String TASK_CLASS_CONFIG = "task.class";
private static final String TASK_CLASS_DOC =
"Name of the class for this task. Must be a subclass of org.apache.kafka.copycat.connector.Task";

private static ConfigDef config;

static {
config = new ConfigDef()
.define(TASK_CLASS_CONFIG, Type.CLASS, Importance.HIGH, TASK_CLASS_DOC);
}

public TaskConfig() {
this(new HashMap<String, String>());
}

public TaskConfig(Map<String, ?> props) {
super(config, props);
}
}
Loading

0 comments on commit 2e61773

Please sign in to comment.