Skip to content

Commit

Permalink
FileSourceTask added a judgment whether the topic is null
Browse files Browse the repository at this point in the history
2.FileSourceTask added a judgment whether the topic is null

Co-authored-by: wangkai <wangkai@zhongan.com>
  • Loading branch information
yiduwangkai and wangkai authored Mar 30, 2023
1 parent 1b8881f commit b144d40
Show file tree
Hide file tree
Showing 5 changed files with 19 additions and 16 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -17,25 +17,10 @@
package org.apache.rocketmq.replicator;

import java.lang.reflect.Field;
import java.util.Collections;
import java.util.HashSet;
import java.util.List;
import java.util.Map;
import java.util.Set;
import org.apache.rocketmq.client.exception.MQClientException;
import org.apache.rocketmq.common.message.MessageQueue;
import org.apache.rocketmq.common.protocol.body.TopicList;
import org.apache.rocketmq.common.protocol.route.QueueData;
import org.apache.rocketmq.common.protocol.route.TopicRouteData;
import org.apache.rocketmq.remoting.exception.RemotingException;
import org.apache.rocketmq.replicator.config.RmqConnectorConfig;
import org.apache.rocketmq.tools.admin.DefaultMQAdminExt;
import org.junit.Test;
import org.junit.runner.RunWith;
import org.mockito.Mock;

import static org.mockito.ArgumentMatchers.any;
import static org.mockito.Mockito.when;
import static org.assertj.core.api.Assertions.assertThat;

import org.mockito.Mockito;
Expand Down
6 changes: 6 additions & 0 deletions pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -66,6 +66,7 @@
<!-- compiler settings properties -->
<maven.compiler.source>1.8</maven.compiler.source>
<maven.compiler.target>1.8</maven.compiler.target>
<commons-lang3.version>3.12.0</commons-lang3.version>
</properties>
<dependencyManagement>
<dependencies>
Expand Down Expand Up @@ -197,6 +198,11 @@
<artifactId>maven-artifact</artifactId>
<version>${maven-artifact.version}</version>
</dependency>
<dependency>
<groupId>org.apache.commons</groupId>
<artifactId>commons-lang3</artifactId>
<version>${commons-lang3.version}</version>
</dependency>
</dependencies>
</dependencyManagement>

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -169,6 +169,10 @@ public WorkerSourceTask(WorkerConfig workerConfig,
@Nullable
private static String overwriteTopicFromRecord(ConnectRecord record) {
KeyValue extensions = record.getExtensions();
if (extensions == null) {
log.error("record extensions null , lack of topic config");
return null;
}
String o = extensions.getString(TOPIC, null);
if (null == o) {
log.error("Partition map element topic is null , lack of topic config");
Expand Down
4 changes: 4 additions & 0 deletions rocketmq-connect-sample/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -72,6 +72,10 @@
<groupId>ch.qos.logback</groupId>
<artifactId>logback-core</artifactId>
</dependency>
<dependency>
<groupId>org.apache.commons</groupId>
<artifactId>commons-lang3</artifactId>
</dependency>
</dependencies>

</project>
Original file line number Diff line number Diff line change
Expand Up @@ -38,6 +38,8 @@
import java.util.HashMap;
import java.util.List;
import java.util.Map;

import org.apache.commons.lang3.StringUtils;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

Expand Down Expand Up @@ -146,7 +148,9 @@ public class FileSourceTask extends SourceTask {
fields.add(field);
schema.setFields(fields);
ConnectRecord connectRecord = new ConnectRecord(offsetKey(fileConfig.getFilename()), offsetValue(streamOffset), System.currentTimeMillis(), schema, line);
connectRecord.addExtension("topic", fileConfig.getTopic());
if (StringUtils.isNoneBlank(fileConfig.getTopic())) {
connectRecord.addExtension("topic", fileConfig.getTopic());
}
records.add(connectRecord);
if (records.size() >= batchSize) {
return records;
Expand Down

0 comments on commit b144d40

Please sign in to comment.