Skip to content

Commit

Permalink
[Issue apache#745] fix the ack bugs and cloudevent message resolver
Browse files Browse the repository at this point in the history
  • Loading branch information
jinrongluo authored and xwm1992 committed Feb 16, 2022
1 parent c0f4ce9 commit 21aaa89
Show file tree
Hide file tree
Showing 3 changed files with 83 additions and 37 deletions.
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
package org.apache.eventmesh.protocol.cloudevents.resolver.grpc;

import io.cloudevents.CloudEvent;
import io.cloudevents.SpecVersion;
import io.cloudevents.core.builder.CloudEventBuilder;
import io.cloudevents.core.format.EventFormat;
import io.cloudevents.core.provider.EventFormatProvider;
Expand Down Expand Up @@ -56,22 +57,42 @@ public static CloudEvent buildEvent(SimpleMessage message) {
String producerGroup = StringUtils.isEmpty(message.getProducerGroup())
? event.getExtension(ProtocolKey.PRODUCERGROUP).toString() : message.getProducerGroup();

return CloudEventBuilder.from(event)
.withExtension(ProtocolKey.ENV, env)
.withExtension(ProtocolKey.IDC, idc)
.withExtension(ProtocolKey.IP, ip)
.withExtension(ProtocolKey.PID, pid)
.withExtension(ProtocolKey.SYS, sys)
.withExtension(ProtocolKey.USERNAME, username)
.withExtension(ProtocolKey.PASSWD, passwd)
.withExtension(ProtocolKey.LANGUAGE, language)
.withExtension(ProtocolKey.PROTOCOL_TYPE, protocolType)
.withExtension(ProtocolKey.PROTOCOL_DESC, protocolDesc)
.withExtension(ProtocolKey.PROTOCOL_VERSION, protocolVersion)
.withExtension(ProtocolKey.SEQ_NUM, seqNum)
.withExtension(ProtocolKey.UNIQUE_ID, uniqueId)
.withExtension(ProtocolKey.PRODUCERGROUP, producerGroup)
.withExtension(ProtocolKey.TTL, ttl).build();
if (StringUtils.equals(SpecVersion.V1.toString(), protocolVersion)) {
return CloudEventBuilder.v1(event)
.withExtension(ProtocolKey.ENV, env)
.withExtension(ProtocolKey.IDC, idc)
.withExtension(ProtocolKey.IP, ip)
.withExtension(ProtocolKey.PID, pid)
.withExtension(ProtocolKey.SYS, sys)
.withExtension(ProtocolKey.USERNAME, username)
.withExtension(ProtocolKey.PASSWD, passwd)
.withExtension(ProtocolKey.LANGUAGE, language)
.withExtension(ProtocolKey.PROTOCOL_TYPE, protocolType)
.withExtension(ProtocolKey.PROTOCOL_DESC, protocolDesc)
.withExtension(ProtocolKey.PROTOCOL_VERSION, protocolVersion)
.withExtension(ProtocolKey.SEQ_NUM, seqNum)
.withExtension(ProtocolKey.UNIQUE_ID, uniqueId)
.withExtension(ProtocolKey.PRODUCERGROUP, producerGroup)
.withExtension(ProtocolKey.TTL, ttl).build();
} else {
return CloudEventBuilder.v03(event)
.withExtension(ProtocolKey.ENV, env)
.withExtension(ProtocolKey.IDC, idc)
.withExtension(ProtocolKey.IP, ip)
.withExtension(ProtocolKey.PID, pid)
.withExtension(ProtocolKey.SYS, sys)
.withExtension(ProtocolKey.USERNAME, username)
.withExtension(ProtocolKey.PASSWD, passwd)
.withExtension(ProtocolKey.LANGUAGE, language)
.withExtension(ProtocolKey.PROTOCOL_TYPE, protocolType)
.withExtension(ProtocolKey.PROTOCOL_DESC, protocolDesc)
.withExtension(ProtocolKey.PROTOCOL_VERSION, protocolVersion)
.withExtension(ProtocolKey.SEQ_NUM, seqNum)
.withExtension(ProtocolKey.UNIQUE_ID, uniqueId)
.withExtension(ProtocolKey.PRODUCERGROUP, producerGroup)
.withExtension(ProtocolKey.TTL, ttl).build();
}

}

public static SimpleMessageWrapper buildSimpleMessage(CloudEvent cloudEvent) {
Expand Down Expand Up @@ -158,21 +179,38 @@ public static List<CloudEvent> buildBatchEvents(BatchMessage batchMessage) {
String username = StringUtils.isEmpty(header.getUsername()) ? event.getExtension(ProtocolKey.USERNAME).toString() : header.getUsername();
String passwd = StringUtils.isEmpty(header.getPassword()) ? event.getExtension(ProtocolKey.PASSWD).toString() : header.getPassword();

CloudEvent enhancedEvent = CloudEventBuilder.from(event)
.withExtension(ProtocolKey.ENV, env)
.withExtension(ProtocolKey.IDC, idc)
.withExtension(ProtocolKey.IP, ip)
.withExtension(ProtocolKey.PID, pid)
.withExtension(ProtocolKey.SYS, sys)
.withExtension(ProtocolKey.USERNAME, username)
.withExtension(ProtocolKey.PASSWD, passwd)
.withExtension(ProtocolKey.LANGUAGE, language)
.withExtension(ProtocolKey.PROTOCOL_TYPE, protocolType)
.withExtension(ProtocolKey.PROTOCOL_DESC, protocolDesc)
.withExtension(ProtocolKey.PROTOCOL_VERSION, protocolVersion)
.build();
if (StringUtils.equals(SpecVersion.V1.toString(), protocolVersion)) {
CloudEvent enhancedEvent = CloudEventBuilder.v1(event)
.withExtension(ProtocolKey.ENV, env)
.withExtension(ProtocolKey.IDC, idc)
.withExtension(ProtocolKey.IP, ip)
.withExtension(ProtocolKey.PID, pid)
.withExtension(ProtocolKey.SYS, sys)
.withExtension(ProtocolKey.USERNAME, username)
.withExtension(ProtocolKey.PASSWD, passwd)
.withExtension(ProtocolKey.LANGUAGE, language)
.withExtension(ProtocolKey.PROTOCOL_TYPE, protocolType)
.withExtension(ProtocolKey.PROTOCOL_DESC, protocolDesc)
.withExtension(ProtocolKey.PROTOCOL_VERSION, protocolVersion)
.build();
cloudEvents.add(enhancedEvent);
} else {
CloudEvent enhancedEvent = CloudEventBuilder.v03(event)
.withExtension(ProtocolKey.ENV, env)
.withExtension(ProtocolKey.IDC, idc)
.withExtension(ProtocolKey.IP, ip)
.withExtension(ProtocolKey.PID, pid)
.withExtension(ProtocolKey.SYS, sys)
.withExtension(ProtocolKey.USERNAME, username)
.withExtension(ProtocolKey.PASSWD, passwd)
.withExtension(ProtocolKey.LANGUAGE, language)
.withExtension(ProtocolKey.PROTOCOL_TYPE, protocolType)
.withExtension(ProtocolKey.PROTOCOL_DESC, protocolDesc)
.withExtension(ProtocolKey.PROTOCOL_VERSION, protocolVersion)
.build();
cloudEvents.add(enhancedEvent);
}

cloudEvents.add(enhancedEvent);
}
return cloudEvents;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -68,6 +68,7 @@ public AbstractPushRequest(HandleMsgContext handleMsgContext, Map<String, Set<Ab
this.handleMsgContext = handleMsgContext;
this.waitingRequests = waitingRequests;

this.eventMeshConsumer = handleMsgContext.getEventMeshConsumer();
this.eventMeshGrpcConfiguration = handleMsgContext.getEventMeshGrpcServer().getEventMeshGrpcConfiguration();
this.grpcRetryer = handleMsgContext.getEventMeshGrpcServer().getGrpcRetryer();
CloudEvent event = handleMsgContext.getEvent();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -219,16 +219,23 @@ public void run() {
}

private T buildMessage(SimpleMessage simpleMessage) {
try {
if (EventMeshCommon.CLOUD_EVENTS_PROTOCOL_NAME.equals(protocolType)) {
if (EventMeshCommon.CLOUD_EVENTS_PROTOCOL_NAME.equals(protocolType)) {
String content = simpleMessage.getContent();
if (content.contains(EventMeshCommon.CLOUD_EVENTS_PROTOCOL_NAME)) {
String contentType = simpleMessage.getPropertiesOrDefault(ProtocolKey.CONTENT_TYPE, JsonFormat.CONTENT_TYPE);
return (T) EventFormatProvider.getInstance().resolveFormat(contentType)
.deserialize(simpleMessage.getContent().getBytes(StandardCharsets.UTF_8));

try {
return (T) EventFormatProvider.getInstance().resolveFormat(contentType)
.deserialize(content.getBytes(StandardCharsets.UTF_8));
} catch (Throwable t) {
logger.warn("Error in building message. {}", t.getMessage());
return null;
}
} else {
return null;
}
} else {
return (T) simpleMessage;
} catch (Throwable t) {
logger.warn("Error in building message. {}", t.getMessage());
return null;
}
}
}
Expand Down

0 comments on commit 21aaa89

Please sign in to comment.