Skip to content

Commit

Permalink
Cleanup of IO protocol code
Browse files Browse the repository at this point in the history
  • Loading branch information
richturner committed Jan 11, 2022
1 parent 6926689 commit 65a4690
Show file tree
Hide file tree
Showing 10 changed files with 44 additions and 92 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -51,7 +51,7 @@ protected UDPIOClient<ArtnetPacket> doCreateIoClient() throws Exception {
protected Supplier<ChannelHandler[]> getEncoderDecoderProvider() {
return () ->
new ChannelHandler[] {
new AbstractNettyIOClient.MessageToByteEncoder<>(ArtnetPacket.class, client.ioClient, ArtnetPacket::toByteBuf)
new AbstractNettyIOClient.MessageToByteEncoder<>(ArtnetPacket.class, client, ArtnetPacket::toByteBuf)
};
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -125,21 +125,24 @@ public static Supplier<ChannelHandler[]> getGenericStringEncodersAndDecoders(Abs
}

public static final Logger LOG = SyslogCategory.getLogger(PROTOCOL, AbstractIOClientProtocol.class);
protected ProtocolIOClient<V, W> client;
protected W client;

protected AbstractIOClientProtocol(U agent) {
super(agent);
}

@Override
public String getProtocolInstanceUri() {
return client != null ? client.ioClient.getClientUri() : "";
return client != null ? client.getClientUri() : "";
}

@Override
protected void doStop(Container container) throws Exception {
if (client != null) {
LOG.fine("Stopping IO client for protocol: " + this);
client.removeAllMessageConsumers();
client.removeAllConnectionStatusConsumers();
AbstractIOClientProtocol.LOG.info("Disconnecting IO client");
client.disconnect();
}
client = null;
Expand All @@ -149,7 +152,7 @@ protected void doStop(Container container) throws Exception {
protected void doStart(Container container) throws Exception {
try {
client = createIoClient();
LOG.fine("Created IO client '" + client.ioClient.getClientUri() + "' for protocol: " + this);
LOG.fine("Created IO client '" + client.getClientUri() + "' for protocol: " + this);
client.connect();
} catch (Exception e) {
LOG.log(Level.WARNING, "Failed to create IO client for protocol: " + this, e);
Expand All @@ -167,18 +170,25 @@ protected void doLinkedAttributeWrite(Attribute<?> attribute, X agentLink, Attri
V message = createWriteMessage(attribute, agent.getAgentLink(attribute), event, processedValue);

if (message == null) {
LOG.fine("No message produced for attribute event so not sending to IO client '" + client.ioClient.getClientUri() + "': " + event);
LOG.fine("No message produced for attribute event so not sending to IO client '" + client.getClientUri() + "': " + event);
return;
}

client.send(message);
AbstractIOClientProtocol.LOG.finer("Sending message to IO client: " + client.getClientUri());
client.sendMessage(message);
}

protected ProtocolIOClient<V, W> createIoClient() throws Exception {
protected W createIoClient() throws Exception {
W client = doCreateIoClient();
ProtocolIOClient<V, W> protocolIoClient = new ProtocolIOClient<>(client, this::onConnectionStatusChanged, this::onMessageReceived);
this.client = protocolIoClient;
return protocolIoClient;

if (client == null) {
throw new IllegalStateException("IO client for protocol should not be null");
}

client.addConnectionStatusConsumer(this::onConnectionStatusChanged);
client.addMessageConsumer(this::onMessageReceived);
this.client = client;
return client;
}

/**
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -34,10 +34,10 @@ protected AbstractNettyIOClientProtocol(U agent) {
}

@Override
protected ProtocolIOClient<V, W> createIoClient() throws Exception {
ProtocolIOClient<V, W> protocolIOClient = super.createIoClient();
protected W createIoClient() throws Exception {
W client = super.createIoClient();
Supplier<ChannelHandler[]> encoderDecoderProvider = getEncoderDecoderProvider();
protocolIOClient.ioClient.setEncoderDecoderProvider(encoderDecoderProvider);
client.setEncoderDecoderProvider(encoderDecoderProvider);
return client;
}

Expand Down

This file was deleted.

Original file line number Diff line number Diff line change
Expand Up @@ -231,6 +231,11 @@ protected boolean doClientSubscription(String topic, Set<Consumer<MQTTMessage<S>
}
};

if (this.connectionStatus != ConnectionStatus.CONNECTED) {
// Just return true and let connection logic sort out actual subscription
return true;
}

try {
Mqtt3SubAck subAck = client.subscribeWith()
.topicFilter(topic)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,6 @@
package org.openremote.agent.protocol.mqtt;

import org.apache.http.client.utils.URIBuilder;
import org.openremote.agent.protocol.io.ProtocolIOClient;
import org.openremote.container.util.UniqueIdentifierGenerator;
import org.openremote.model.attribute.Attribute;
import org.openremote.model.attribute.AttributeEvent;
Expand Down Expand Up @@ -54,7 +53,7 @@ protected void doLinkAttribute(String assetId, Attribute<?> attribute, MQTTAgent
Consumer<MQTTMessage<String>> messageConsumer = msg -> updateLinkedAttribute(
new AttributeState(assetId, attribute.getName(), msg.payload)
);
client.ioClient.addMessageConsumer(topic, messageConsumer);
client.addMessageConsumer(topic, messageConsumer);
protocolMessageConsumers.put(new AttributeRef(assetId, attribute.getName()), messageConsumer);
});
}
Expand All @@ -65,20 +64,19 @@ protected void doUnlinkAttribute(String assetId, Attribute<?> attribute, MQTTAge
AttributeRef attributeRef = new AttributeRef(assetId, attribute.getName());
Consumer<MQTTMessage<String>> messageConsumer = protocolMessageConsumers.remove(attributeRef);
if (messageConsumer != null) {
client.ioClient.removeMessageConsumer(topic, messageConsumer);
client.removeMessageConsumer(topic, messageConsumer);
}
});
}

@Override
protected ProtocolIOClient<MQTTMessage<String>, MQTT_IOClient> createIoClient() throws Exception {
MQTT_IOClient client = doCreateIoClient();
ProtocolIOClient<MQTTMessage<String>, MQTT_IOClient> protocolIoClient = new ProtocolIOClient<>(client, this::onConnectionStatusChanged, null);
this.client = protocolIoClient;
return protocolIoClient;
protected MQTT_IOClient createIoClient() throws Exception {
MQTT_IOClient client = super.createIoClient();
// Don't want the default message consumer, topic specific consumers will do the message routing for us
client.removeAllMessageConsumers();
return client;
}


@Override
protected MQTT_IOClient doCreateIoClient() throws Exception {
String host = agent.getHost().orElse(null);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -81,7 +81,7 @@ protected void doUnlinkAttribute(String assetId, Attribute<?> attribute, Default

@Override
protected Supplier<ChannelHandler[]> getEncoderDecoderProvider() {
return getGenericStringEncodersAndDecoders(client.ioClient, agent);
return getGenericStringEncodersAndDecoders(client, agent);
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -81,7 +81,7 @@ protected void doUnlinkAttribute(String assetId, Attribute<?> attribute, Default

@Override
protected Supplier<ChannelHandler[]> getEncoderDecoderProvider() {
return getGenericStringEncodersAndDecoders(client.ioClient, agent);
return getGenericStringEncodersAndDecoders(client, agent);
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -81,7 +81,7 @@ protected void doUnlinkAttribute(String assetId, Attribute<?> attribute, Default

@Override
protected Supplier<ChannelHandler[]> getEncoderDecoderProvider() {
return getGenericStringEncodersAndDecoders(client.ioClient, agent);
return getGenericStringEncodersAndDecoders(client, agent);
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -104,7 +104,7 @@ protected void doStop(Container container) throws Exception {

@Override
protected Supplier<ChannelHandler[]> getEncoderDecoderProvider() {
return getGenericStringEncodersAndDecoders(client.ioClient, agent);
return getGenericStringEncodersAndDecoders(client, agent);
}

@Override
Expand Down Expand Up @@ -188,7 +188,7 @@ protected void doLinkAttribute(String assetId, Attribute<?> attribute, Websocket
subscriptions.ifPresent(websocketSubscriptions -> {
Runnable task = () -> doSubscriptions(clientHeaders, websocketSubscriptions);
addAttributeConnectedTask(attributeRef, task);
if (client.ioClient.getConnectionStatus() == ConnectionStatus.CONNECTED) {
if (client.getConnectionStatus() == ConnectionStatus.CONNECTED) {
executorService.schedule(task, 1000, TimeUnit.MILLISECONDS);
}
});
Expand Down Expand Up @@ -236,15 +236,15 @@ protected void addAttributeConnectedTask(AttributeRef attributeRef, Runnable tas
}

protected void doSubscriptions(Map<String, List<String>> headers, WebsocketSubscription[] subscriptions) {
LOG.info("Executing subscriptions for websocket: " + client.ioClient.getClientUri());
LOG.info("Executing subscriptions for websocket: " + client.getClientUri());

// Inject OAuth header
if (!TextUtil.isNullOrEmpty(client.ioClient.authHeaderValue)) {
if (!TextUtil.isNullOrEmpty(client.authHeaderValue)) {
if (headers == null) {
headers = new MultivaluedHashMap<>();
}
headers.remove(HttpHeaders.AUTHORIZATION);
headers.put(HttpHeaders.AUTHORIZATION, Collections.singletonList(client.ioClient.authHeaderValue));
headers.put(HttpHeaders.AUTHORIZATION, Collections.singletonList(client.authHeaderValue));
}

Map<String, List<String>> finalHeaders = headers;
Expand Down Expand Up @@ -315,7 +315,7 @@ protected void doSubscription(Map<String, List<String>> headers, WebsocketSubscr
LOG.warning("WebsocketHttpSubscription returned an un-successful response code: " + response.getStatus());
}
} else {
client.ioClient.sendMessage(ValueUtil.convert(subscription.body, String.class));
client.sendMessage(ValueUtil.convert(subscription.body, String.class));
}
}
}

0 comments on commit 65a4690

Please sign in to comment.