Skip to content

Commit

Permalink
Fix break changes in namespace offload policy and RabbitMQ sink. (#7011)
Browse files Browse the repository at this point in the history
### Motivation

Fix break changes in namespace offload policy and RabbitMQ sink.
  • Loading branch information
codelipenghui authored May 22, 2020
1 parent 812b8f2 commit 0905260
Show file tree
Hide file tree
Showing 3 changed files with 25 additions and 2 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -74,6 +74,12 @@ public class RabbitMQAbstractConfig implements Serializable {
help = "The password used to authenticate to RabbitMQ")
private String password = "guest";

@FieldDoc(
required = false,
defaultValue = "",
help = "The RabbitMQ queue name from which messages should be read from or written to")
private String queueName;

@FieldDoc(
required = false,
defaultValue = "0",
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.ConnectionFactory;
import lombok.extern.slf4j.Slf4j;
import org.apache.commons.lang3.StringUtils;
import org.apache.pulsar.functions.api.Record;
import org.apache.pulsar.io.core.Sink;
import org.apache.pulsar.io.core.SinkContext;
Expand All @@ -49,6 +50,7 @@ public class RabbitMQSink implements Sink<byte[]> {
private Channel rabbitMQChannel;
private RabbitMQSinkConfig rabbitMQSinkConfig;
private String exchangeName;
private String defaultRoutingKey;

@Override
public void open(Map<String, Object> config, SinkContext sinkContext) throws Exception {
Expand All @@ -63,17 +65,26 @@ public void open(Map<String, Object> config, SinkContext sinkContext) throws Exc
);

exchangeName = rabbitMQSinkConfig.getExchangeName();
defaultRoutingKey = rabbitMQSinkConfig.getRoutingKey();
String exchangeType = rabbitMQSinkConfig.getExchangeType();

rabbitMQChannel = rabbitMQConnection.createChannel();
rabbitMQChannel.exchangeDeclare(exchangeName, exchangeType, true);
String queueName = rabbitMQSinkConfig.getQueueName();
if (StringUtils.isNotEmpty(queueName)) {
rabbitMQChannel.exchangeDeclare(exchangeName, BuiltinExchangeType.DIRECT, true);
rabbitMQChannel.queueDeclare(rabbitMQSinkConfig.getQueueName(), true, false, false, null);
rabbitMQChannel.queueBind(rabbitMQSinkConfig.getQueueName(), exchangeName, defaultRoutingKey);
} else {
rabbitMQChannel.exchangeDeclare(exchangeName, exchangeType, true);
}
}

@Override
public void write(Record<byte[]> record) {
byte[] value = record.getValue();
try {
rabbitMQChannel.basicPublish(exchangeName, record.getProperties().get("routingKey"), null, value);
String routingKey = record.getProperties().get("routingKey");
rabbitMQChannel.basicPublish(exchangeName, StringUtils.isEmpty(routingKey) ? defaultRoutingKey : routingKey, null, value);
record.ack();
} catch (IOException e) {
record.fail();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -44,6 +44,12 @@ public class RabbitMQSinkConfig extends RabbitMQAbstractConfig implements Serial
help = "The exchange to publish the messages on")
private String exchangeName;

@FieldDoc(
required = false,
defaultValue = "",
help = "The routing key used for publishing the messages")
private String routingKey;

@FieldDoc(
required = false,
defaultValue = "topic",
Expand Down

0 comments on commit 0905260

Please sign in to comment.