Skip to content

Commit

Permalink
version upgrade, sample code optimization.
Browse files Browse the repository at this point in the history
  • Loading branch information
bilahepan committed Nov 16, 2021
1 parent adedd2c commit 10b1b5e
Show file tree
Hide file tree
Showing 4 changed files with 117 additions and 115 deletions.
2 changes: 1 addition & 1 deletion open-mq-sdk/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,7 @@
<dependency>
<groupId>org.apache.pulsar</groupId>
<artifactId>pulsar-client</artifactId>
<version>2.5.0</version>
<version>2.7.1</version>
</dependency>
<dependency>
<groupId>com.alibaba</groupId>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -6,26 +6,39 @@

import com.tuya.open.sdk.mq.MqConfigs;
import com.tuya.open.sdk.mq.MqConsumer;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public class ConsumerExample {
private static final Logger logger = LoggerFactory.getLogger(ConsumerExample.class);

public static void main(String[] args) throws Exception {
String url = MqConfigs.CN_SERVER_URL;
String accessId = "";
String accessKey = "";

MqConsumer mqConsumer = MqConsumer.build().serviceUrl(url).accessId(accessId).accessKey(accessKey)
.maxRedeliverCount(3).messageListener(message -> {
System.out.println("---------------------------------------------------");
System.out.println("Message received:" + new String(message.getData()) + ",seq="
+ message.getSequenceId() + ",time=" + message.getPublishTime() + ",consumed time="
+ System.currentTimeMillis());
String jsonMessage = new String(message.getData());
MessageVO vo = JSON.parseObject(jsonMessage, MessageVO.class);
System.out.println("the real message data:" + AESBase64Utils.decrypt(vo.getData(), accessKey.substring(8, 24)));
}
private static String URL = MqConfigs.CN_SERVER_URL;
private static String ACCESS_ID = "";
private static String ACCESS_KEY = "";

);
public static void main(String[] args) throws Exception {
MqConsumer mqConsumer = MqConsumer.build().serviceUrl(URL).accessId(ACCESS_ID).accessKey(ACCESS_KEY)
.messageListener(message -> {
System.out.println("---------------------------------------------------");
System.out.println("Message received:" + new String(message.getData()) + ",time="
+ message.getPublishTime() + ",consumed time=" + System.currentTimeMillis());
String payload = new String(message.getData());
payloadHandler(payload);
});
mqConsumer.start();
}

/**
* This method is used to process your message business
*/
private static void payloadHandler(String payload) {
try {
MessageVO messageVO = JSON.parseObject(payload, MessageVO.class);
//decryption data
String dataJsonStr = AESBase64Utils.decrypt(messageVO.getData(), ACCESS_KEY.substring(8, 24));
System.out.println("messageVO=" + messageVO.toString() + "\n" + "data after decryption dataJsonStr=" + dataJsonStr);
} catch (Exception e) {
logger.error("payload=" + payload + "; your business processing exception, please check and handle. e=", e);
}
}
}
Original file line number Diff line number Diff line change
@@ -1,5 +1,7 @@
package com.tuya.open.sdk.example;

import com.alibaba.fastjson.JSONObject;

import java.io.Serializable;

/**
Expand Down Expand Up @@ -59,12 +61,6 @@ public void setT(Long t) {

@Override
public String toString() {
return "MessageVO{" +
"data='" + data + '\'' +
", protocol=" + protocol +
", pv='" + pv + '\'' +
", sign='" + sign + '\'' +
", t=" + t +
'}';
return JSONObject.toJSONString(this);
}
}
175 changes: 84 additions & 91 deletions open-mq-sdk/src/main/java/com/tuya/open/sdk/mq/MqConsumer.java
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,6 @@
package com.tuya.open.sdk.mq;

import org.apache.pulsar.client.api.Consumer;
import org.apache.pulsar.client.api.DeadLetterPolicy;
import org.apache.pulsar.client.api.Message;
import org.apache.pulsar.client.api.PulsarClient;
import org.apache.pulsar.client.api.SubscriptionType;
Expand All @@ -11,95 +10,89 @@

public class MqConsumer {

private static final Logger logger = LoggerFactory.getLogger(MqConsumer.class);

private String serviceUrl;

private String accessId;

private String accessKey;

private MqEnv env = MqEnv.PROD;

private int maxRedeliverCount = 3;

private IMessageListener messageListener;

public static MqConsumer build() {
return new MqConsumer();
}

public MqConsumer serviceUrl(String serviceUrl) {
this.serviceUrl = serviceUrl;
return this;
}

public MqConsumer accessId(String accessId) {
this.accessId = accessId;
return this;
}

public MqConsumer accessKey(String accessKey) {
this.accessKey = accessKey;
return this;
}

public MqConsumer env(MqEnv env) {
this.env = env;
return this;
}

public MqConsumer maxRedeliverCount(int maxRedeliverCount) {
this.maxRedeliverCount = maxRedeliverCount;
return this;
}

public MqConsumer messageListener(IMessageListener messageListener) {
this.messageListener = messageListener;
return this;
}

/**
* Start consumer
*
* @throws Exception
*/
public void start() throws Exception {
if (serviceUrl == null || serviceUrl.trim().length() == 0) {
throw new IllegalStateException("serviceUrl must be initialized");
}
if (accessId == null || accessId.trim().length() == 0) {
throw new IllegalStateException("accessId must be initialized");
}
if (accessKey == null || accessKey.trim().length() == 0) {
throw new IllegalStateException("accessKey must be initialized");
}
if (messageListener == null) {
throw new IllegalStateException("messageListener must be initialized");
}
PulsarClient client = PulsarClient.builder().serviceUrl(serviceUrl).allowTlsInsecureConnection(true)
.authentication(new MqAuthentication(accessId, accessKey)).build();
Consumer consumer = client.newConsumer().topic(String.format("%s/out/%s", accessId, env.getValue()))
.subscriptionName(String.format("%s-sub", accessId)).subscriptionType(SubscriptionType.Failover)
.deadLetterPolicy(DeadLetterPolicy.builder().maxRedeliverCount(maxRedeliverCount).build()).subscribe();
do {
try {
Message message = consumer.receive();
Long s = System.currentTimeMillis();
messageListener.onMessageArrived(message);
if (MqConfigs.DEBUG) {
logger.info("business processing cost={}", System.currentTimeMillis() - s);
}
consumer.acknowledge(message);
} catch (Throwable t) {
logger.error("error:", t);
}
} while (true);
}

public interface IMessageListener {

void onMessageArrived(Message message) throws Exception;
}
private static final Logger logger = LoggerFactory.getLogger(MqConsumer.class);

private String serviceUrl;

private String accessId;

private String accessKey;

private MqEnv env = MqEnv.PROD;

private IMessageListener messageListener;

public static MqConsumer build() {
return new MqConsumer();
}

public MqConsumer serviceUrl(String serviceUrl) {
this.serviceUrl = serviceUrl;
return this;
}

public MqConsumer accessId(String accessId) {
this.accessId = accessId;
return this;
}

public MqConsumer accessKey(String accessKey) {
this.accessKey = accessKey;
return this;
}

public MqConsumer env(MqEnv env) {
this.env = env;
return this;
}


public MqConsumer messageListener(IMessageListener messageListener) {
this.messageListener = messageListener;
return this;
}

/**
* Start consumer
*
* @throws Exception
*/
public void start() throws Exception {
if (serviceUrl == null || serviceUrl.trim().length() == 0) {
throw new IllegalStateException("serviceUrl must be initialized");
}
if (accessId == null || accessId.trim().length() == 0) {
throw new IllegalStateException("accessId must be initialized");
}
if (accessKey == null || accessKey.trim().length() == 0) {
throw new IllegalStateException("accessKey must be initialized");
}
if (messageListener == null) {
throw new IllegalStateException("messageListener must be initialized");
}
PulsarClient client = PulsarClient.builder().serviceUrl(serviceUrl).allowTlsInsecureConnection(true)
.authentication(new MqAuthentication(accessId, accessKey)).build();
Consumer consumer = client.newConsumer().topic(String.format("%s/out/%s", accessId, env.getValue()))
.subscriptionName(String.format("%s-sub", accessId)).subscriptionType(SubscriptionType.Failover)
.autoUpdatePartitions(Boolean.FALSE).subscribe();
do {
try {
Message message = consumer.receive();
Long s = System.currentTimeMillis();
messageListener.onMessageArrived(message);
if (MqConfigs.DEBUG) {
logger.info("business processing cost={}", System.currentTimeMillis() - s);
}
consumer.acknowledge(message);
} catch (Throwable t) {
logger.error("error:", t);
}
} while (true);
}

public interface IMessageListener {

void onMessageArrived(Message message) throws Exception;
}

}

0 comments on commit 10b1b5e

Please sign in to comment.