Skip to content

Commit

Permalink
Merge pull request apache#597 from zongtanghu/develop
Browse files Browse the repository at this point in the history
[ISSUE apache#525] Support the message track. Merge to branch msg_track
  • Loading branch information
dongeforever authored Dec 10, 2018
2 parents 7fd657f + 47556d6 commit c50ada6
Show file tree
Hide file tree
Showing 33 changed files with 2,124 additions and 13 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -228,6 +228,14 @@ private synchronized RemotingCommand updateAndCreateTopic(ChannelHandlerContext
return response;
}

if (this.brokerController.getBrokerConfig().isAutoTraceBrokerEnable()) {
String errorMsg = "the topic[" + requestHeader.getTopic() + "] is user self defined topic and this node is trace broker!";
log.warn(errorMsg);
response.setCode(ResponseCode.SYSTEM_ERROR);
response.setRemark(errorMsg);
return response;
}

try {
response.setCode(ResponseCode.SUCCESS);
response.setOpaque(request.getOpaque());
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -124,6 +124,16 @@ public TopicConfigManager(BrokerController brokerController) {
topicConfig.setWriteQueueNums(1);
this.topicConfigTable.put(topicConfig.getTopicName(), topicConfig);
}
{
if (this.brokerController.getBrokerConfig().isAutoTraceBrokerEnable()) {
String topic = MixAll.RMQ_SYS_TRACK_TRACE_TOPIC;
TopicConfig topicConfig = new TopicConfig(topic);
this.systemTopicList.add(topic);
topicConfig.setReadQueueNums(1);
topicConfig.setWriteQueueNums(1);
this.topicConfigTable.put(topicConfig.getTopicName(), topicConfig);
}
}
}

public boolean isSystemTopic(final String topic) {
Expand Down Expand Up @@ -154,6 +164,10 @@ public TopicConfig createTopicInSendMessageMethod(final String topic, final Stri
if (topicConfig != null)
return topicConfig;

if (this.brokerController.getBrokerConfig().isAutoTraceBrokerEnable()) {
return topicConfig;
}

TopicConfig defaultTopicConfig = this.topicConfigTable.get(defaultTopic);
if (defaultTopicConfig != null) {
if (defaultTopic.equals(MixAll.AUTO_CREATE_TOPIC_KEY_TOPIC)) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@

import java.util.HashMap;
import java.util.Map;
import java.util.Properties;
import java.util.Set;
import org.apache.rocketmq.client.ClientConfig;
import org.apache.rocketmq.client.QueryResult;
Expand All @@ -29,13 +30,20 @@
import org.apache.rocketmq.client.exception.MQBrokerException;
import org.apache.rocketmq.client.exception.MQClientException;
import org.apache.rocketmq.client.impl.consumer.DefaultMQPushConsumerImpl;
import org.apache.rocketmq.client.log.ClientLogger;
import org.apache.rocketmq.client.trace.core.common.TrackTraceConstants;
import org.apache.rocketmq.client.trace.core.common.TrackTraceDispatcherType;
import org.apache.rocketmq.client.trace.core.dispatch.AsyncDispatcher;
import org.apache.rocketmq.client.trace.core.dispatch.impl.AsyncArrayDispatcher;
import org.apache.rocketmq.client.trace.core.hook.ConsumeMessageTraceHookImpl;
import org.apache.rocketmq.common.MixAll;
import org.apache.rocketmq.common.UtilAll;
import org.apache.rocketmq.common.consumer.ConsumeFromWhere;
import org.apache.rocketmq.common.message.MessageDecoder;
import org.apache.rocketmq.common.message.MessageExt;
import org.apache.rocketmq.common.message.MessageQueue;
import org.apache.rocketmq.common.protocol.heartbeat.MessageModel;
import org.apache.rocketmq.logging.InternalLogger;
import org.apache.rocketmq.remoting.RPCHook;
import org.apache.rocketmq.remoting.exception.RemotingException;

Expand All @@ -56,6 +64,8 @@
*/
public class DefaultMQPushConsumer extends ClientConfig implements MQPushConsumer {

private final InternalLogger log = ClientLogger.getLog();

/**
* Internal implementation. Most of the functions herein are delegated to it.
*/
Expand Down Expand Up @@ -246,6 +256,11 @@ public class DefaultMQPushConsumer extends ClientConfig implements MQPushConsume
*/
private long consumeTimeout = 15;

/**
* Interface of asynchronous transfer data
*/
private AsyncDispatcher traceDispatcher = null;

/**
* Default constructor.
*/
Expand All @@ -267,6 +282,39 @@ public DefaultMQPushConsumer(final String consumerGroup, RPCHook rpcHook,
defaultMQPushConsumerImpl = new DefaultMQPushConsumerImpl(this, rpcHook);
}

/**
* Constructor specifying consumer group, RPC hook and message queue allocating algorithm.
*
* @param consumerGroup Consume queue.
* @param rpcHook RPC hook to execute before each remoting command.
* @param allocateMessageQueueStrategy message queue allocating algorithm.
* @param msgTraceSwitch switch flag instance for message track trace.
*/
public DefaultMQPushConsumer(final String consumerGroup, RPCHook rpcHook,
AllocateMessageQueueStrategy allocateMessageQueueStrategy, boolean msgTraceSwitch) {
this.consumerGroup = consumerGroup;
this.allocateMessageQueueStrategy = allocateMessageQueueStrategy;
defaultMQPushConsumerImpl = new DefaultMQPushConsumerImpl(this, rpcHook);
if (msgTraceSwitch) {
try {
Properties tempProperties = new Properties();
tempProperties.put(TrackTraceConstants.MAX_MSG_SIZE, "128000");
tempProperties.put(TrackTraceConstants.ASYNC_BUFFER_SIZE, "2048");
tempProperties.put(TrackTraceConstants.MAX_BATCH_NUM, "100");
tempProperties.put(TrackTraceConstants.INSTANCE_NAME, "PID_CLIENT_INNER_TRACE_PRODUCER");
tempProperties.put(TrackTraceConstants.TRACE_DISPATCHER_TYPE, TrackTraceDispatcherType.CONSUMER.name());
AsyncArrayDispatcher dispatcher = new AsyncArrayDispatcher(tempProperties);
dispatcher.setHostConsumer(this.getDefaultMQPushConsumerImpl());
traceDispatcher = dispatcher;

this.getDefaultMQPushConsumerImpl().registerConsumeMessageHook(
new ConsumeMessageTraceHookImpl(traceDispatcher));
} catch (Throwable e) {
log.error("system mqtrace hook init failed ,maybe can't send msg trace data");
}
}
}

/**
* Constructor specifying RPC hook.
*
Expand All @@ -276,6 +324,16 @@ public DefaultMQPushConsumer(RPCHook rpcHook) {
this(MixAll.DEFAULT_CONSUMER_GROUP, rpcHook, new AllocateMessageQueueAveragely());
}

/**
* Constructor specifying consumer group.
*
* @param consumerGroup Consumer group.
* @param msgTraceSwitch switch flag instance for message track trace.
*/
public DefaultMQPushConsumer(final String consumerGroup, boolean msgTraceSwitch) {
this(consumerGroup, null, new AllocateMessageQueueAveragely(), msgTraceSwitch);
}

/**
* Constructor specifying consumer group.
*
Expand Down Expand Up @@ -518,6 +576,15 @@ public Set<MessageQueue> fetchSubscribeMessageQueues(String topic) throws MQClie
@Override
public void start() throws MQClientException {
this.defaultMQPushConsumerImpl.start();
if (null != traceDispatcher) {
try {
Properties tempProperties = new Properties();
tempProperties.put(TrackTraceConstants.NAMESRV_ADDR, this.getNamesrvAddr());
traceDispatcher.start(tempProperties);
} catch (MQClientException e) {
log.warn("trace dispatcher start failed ", e);
}
}
}

/**
Expand All @@ -526,6 +593,9 @@ public void start() throws MQClientException {
@Override
public void shutdown() {
this.defaultMQPushConsumerImpl.shutdown();
if (null != traceDispatcher) {
traceDispatcher.shutdown();
}
}

@Override
Expand Down Expand Up @@ -694,4 +764,12 @@ public long getConsumeTimeout() {
public void setConsumeTimeout(final long consumeTimeout) {
this.consumeTimeout = consumeTimeout;
}

public AsyncDispatcher getTraceDispatcher() {
return traceDispatcher;
}

public void setTraceDispatcher(AsyncDispatcher traceDispatcher) {
this.traceDispatcher = traceDispatcher;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -18,13 +18,20 @@

import java.util.Collection;
import java.util.List;
import java.util.Properties;
import java.util.concurrent.ExecutorService;
import org.apache.rocketmq.client.ClientConfig;
import org.apache.rocketmq.client.QueryResult;
import org.apache.rocketmq.client.Validators;
import org.apache.rocketmq.client.exception.MQBrokerException;
import org.apache.rocketmq.client.exception.MQClientException;
import org.apache.rocketmq.client.impl.producer.DefaultMQProducerImpl;
import org.apache.rocketmq.client.log.ClientLogger;
import org.apache.rocketmq.client.trace.core.common.TrackTraceConstants;
import org.apache.rocketmq.client.trace.core.common.TrackTraceDispatcherType;
import org.apache.rocketmq.client.trace.core.dispatch.AsyncDispatcher;
import org.apache.rocketmq.client.trace.core.dispatch.impl.AsyncArrayDispatcher;
import org.apache.rocketmq.client.trace.core.hook.SendMessageTrackHookImpl;
import org.apache.rocketmq.common.MixAll;
import org.apache.rocketmq.common.message.Message;
import org.apache.rocketmq.common.message.MessageBatch;
Expand All @@ -33,6 +40,7 @@
import org.apache.rocketmq.common.message.MessageExt;
import org.apache.rocketmq.common.message.MessageId;
import org.apache.rocketmq.common.message.MessageQueue;
import org.apache.rocketmq.logging.InternalLogger;
import org.apache.rocketmq.remoting.RPCHook;
import org.apache.rocketmq.remoting.exception.RemotingException;
import org.apache.rocketmq.remoting.netty.NettyRemotingClient;
Expand All @@ -56,6 +64,8 @@
*/
public class DefaultMQProducer extends ClientConfig implements MQProducer {

private final InternalLogger log = ClientLogger.getLog();

/**
* Wrapping internal implementations for virtually all methods presented in this class.
*/
Expand Down Expand Up @@ -119,11 +129,16 @@ public class DefaultMQProducer extends ClientConfig implements MQProducer {
*/
private int maxMessageSize = 1024 * 1024 * 4; // 4M

/**
* Interface of asynchronous transfer data
*/
private AsyncDispatcher traceDispatcher = null;

/**
* Default constructor.
*/
public DefaultMQProducer() {
this(MixAll.DEFAULT_PRODUCER_GROUP, null);
this(MixAll.DEFAULT_PRODUCER_GROUP, null,false);
}

/**
Expand All @@ -137,6 +152,37 @@ public DefaultMQProducer(final String producerGroup, RPCHook rpcHook) {
defaultMQProducerImpl = new DefaultMQProducerImpl(this, rpcHook);
}

/**
* Constructor specifying both producer group and RPC hook.
*
* @param producerGroup Producer group, see the name-sake field.
* @param rpcHook RPC hook to execute per each remoting command execution.
* @param msgTraceSwitch switch flag instance for message track trace.
*/
public DefaultMQProducer(final String producerGroup, RPCHook rpcHook, boolean msgTraceSwitch) {
this.producerGroup = producerGroup;
defaultMQProducerImpl = new DefaultMQProducerImpl(this, rpcHook);
//if client open the message track trace feature
if (msgTraceSwitch) {
try {
Properties tempProperties = new Properties();
tempProperties.put(TrackTraceConstants.MAX_MSG_SIZE, "128000");
tempProperties.put(TrackTraceConstants.ASYNC_BUFFER_SIZE, "2048");
tempProperties.put(TrackTraceConstants.MAX_BATCH_NUM, "100");
tempProperties.put(TrackTraceConstants.INSTANCE_NAME, "PID_CLIENT_INNER_TRACE_PRODUCER");
tempProperties.put(TrackTraceConstants.TRACE_DISPATCHER_TYPE, TrackTraceDispatcherType.PRODUCER.name());
AsyncArrayDispatcher dispatcher = new AsyncArrayDispatcher(tempProperties);
dispatcher.setHostProducer(this.getDefaultMQProducerImpl());
traceDispatcher = dispatcher;

this.getDefaultMQProducerImpl().registerSendMessageHook(
new SendMessageTrackHookImpl(traceDispatcher));
} catch (Throwable e) {
log.error("system mqtrace hook init failed ,maybe can't send msg trace data");
}
}
}

/**
* Constructor specifying producer group.
*
Expand All @@ -147,8 +193,18 @@ public DefaultMQProducer(final String producerGroup) {
}

/**
* Constructor specifying the RPC hook.
* Constructor specifying producer group.
*
* @param producerGroup Producer group, see the name-sake field.
* @param msgTraceSwitch switch flag instance for message track trace.
*/
public DefaultMQProducer(final String producerGroup, boolean msgTraceSwitch) {
this(producerGroup, null, msgTraceSwitch);
}

/**
* Constructor specifying the RPC hook.
*
* @param rpcHook RPC hook to execute per each remoting command execution.
*/
public DefaultMQProducer(RPCHook rpcHook) {
Expand All @@ -170,6 +226,15 @@ public DefaultMQProducer(RPCHook rpcHook) {
@Override
public void start() throws MQClientException {
this.defaultMQProducerImpl.start();
if (null != traceDispatcher) {
try {
Properties tempProperties = new Properties();
tempProperties.put(TrackTraceConstants.NAMESRV_ADDR, this.getNamesrvAddr());
traceDispatcher.start(tempProperties);
} catch (MQClientException e) {
log.warn("trace dispatcher start failed ", e);
}
}
}

/**
Expand All @@ -178,6 +243,9 @@ public void start() throws MQClientException {
@Override
public void shutdown() {
this.defaultMQProducerImpl.shutdown();
if (null != traceDispatcher) {
traceDispatcher.shutdown();
}
}

/**
Expand Down Expand Up @@ -787,4 +855,12 @@ public int getRetryTimesWhenSendAsyncFailed() {
public void setRetryTimesWhenSendAsyncFailed(final int retryTimesWhenSendAsyncFailed) {
this.retryTimesWhenSendAsyncFailed = retryTimesWhenSendAsyncFailed;
}

public AsyncDispatcher getTraceDispatcher() {
return traceDispatcher;
}

public void setTraceDispatcher(AsyncDispatcher traceDispatcher) {
this.traceDispatcher = traceDispatcher;
}
}
Loading

0 comments on commit c50ada6

Please sign in to comment.