Skip to content

Commit

Permalink
Introduced AttributeEvent multithreaded processing BREAKING (openremo…
Browse files Browse the repository at this point in the history
…te#1205)

- Asset update processing chain removed; services can now register an interceptor on the AssetProcessingService and/or listen to AttributeEvents generated after an attribute is updated in the DB

- AssetUpdateProcessor interface replaced with AttributeEventInterceptor

- AssetState removed and replaced with AttributeInfo interface implemented by the now enhanced AttributeEvent object

- EC2 cloudformation template updates

- Procotol refactor - removed use of camel routes for protocol event processing

- ProtocolAssetService interface moved to model package

- AgentProtocolAssetService now has an instance per Agent

- AttributeEvent source now added as a property of the event (for internal use) rather than using the camel exchange header
  • Loading branch information
richturner authored Jan 25, 2024
1 parent 5bc183c commit 531c73a
Showing 170 changed files with 3,193 additions and 3,541 deletions.
510 changes: 267 additions & 243 deletions .ci_cd/aws/cloudformation-create-ec2.yml

Large diffs are not rendered by default.

2 changes: 2 additions & 0 deletions .ci_cd/env/.env
Original file line number Diff line number Diff line change
@@ -4,3 +4,5 @@ DAILY_BACKUP=false
ENV_COMPOSE_FILE=profile/demo.yml
OR_SETUP_TYPE=demo
OR_EMAIL_TO="developers@openremote.io"
OR_SETUP_RUN_ON_RESTART=true
DAILY_RESTART=true
2 changes: 0 additions & 2 deletions .ci_cd/env/demo.env
Original file line number Diff line number Diff line change
@@ -1,4 +1,2 @@
OR_HOSTNAME=demo.openremote.app
OR_ADDITIONAL_HOSTNAMES=demo.openremote.io
OR_SETUP_RUN_ON_RESTART=true
DAILY_RESTART=true
2 changes: 2 additions & 0 deletions .ci_cd/env/load1.env
Original file line number Diff line number Diff line change
@@ -0,0 +1,2 @@
DAILY_RESTART=false
OR_SETUP_TYPE=load1
2 changes: 0 additions & 2 deletions .ci_cd/env/staging.env
Original file line number Diff line number Diff line change
@@ -1,4 +1,2 @@
OR_HOSTNAME=staging.demo.openremote.app
OR_ADDITIONAL_HOSTNAMES=staging.demo.openremote.io
OR_SETUP_RUN_ON_RESTART=true
DAILY_RESTART=true
233 changes: 47 additions & 186 deletions agent/src/main/java/org/openremote/agent/protocol/AbstractProtocol.java
Original file line number Diff line number Diff line change
@@ -20,13 +20,10 @@
package org.openremote.agent.protocol;

import org.apache.camel.ProducerTemplate;
import org.apache.camel.builder.RouteBuilder;
import org.apache.camel.impl.DefaultCamelContext;
import org.openremote.container.message.MessageBrokerService;
import org.openremote.container.timer.TimerService;
import org.openremote.model.Container;
import org.openremote.model.PersistenceEvent;
import org.openremote.model.asset.Asset;
import org.openremote.model.asset.agent.Agent;
import org.openremote.model.asset.agent.AgentLink;
import org.openremote.model.asset.agent.ConnectionStatus;
@@ -35,16 +32,17 @@
import org.openremote.model.attribute.AttributeEvent;
import org.openremote.model.attribute.AttributeRef;
import org.openremote.model.attribute.AttributeState;
import org.openremote.model.protocol.ProtocolAssetService;
import org.openremote.model.protocol.ProtocolUtil;
import org.openremote.model.query.AssetQuery;
import org.openremote.model.util.Pair;
import org.openremote.model.util.TextUtil;

import java.util.*;
import java.util.Collections;
import java.util.HashSet;
import java.util.Map;
import java.util.Set;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.TimeUnit;
import java.util.function.Consumer;

import static org.openremote.model.protocol.ProtocolUtil.hasDynamicWriteValue;
import static org.openremote.model.syslog.SyslogCategory.PROTOCOL;
@@ -62,53 +60,26 @@ public abstract class AbstractProtocol<T extends Agent<T, ?, U>, U extends Agent
protected ProtocolPredictedDatapointService predictedDatapointService;
protected ProtocolDatapointService datapointService;
protected T agent;
protected final Object processorLock = new Object();

public AbstractProtocol(T agent) {
this.agent = agent;
}

@Override
public void setAssetService(ProtocolAssetService assetService) {
this.assetService = assetService;
}

@Override
public void start(Container container) throws Exception {
timerService = container.getService(TimerService.class);
executorService = container.getExecutorService();
assetService = proxyAssetService(container.getService(ProtocolAssetService.class));
predictedDatapointService = container.getService(ProtocolPredictedDatapointService.class);
datapointService = container.getService(ProtocolDatapointService.class);
messageBrokerContext = container.getService(MessageBrokerService.class).getContext();

// TODO: Priority should each protocol instance have its' own camel route
try {
messageBrokerContext.addRoutes(new RouteBuilder() {
@Override
public void configure() throws Exception {
from(ACTUATOR_TOPIC)
.routeId("ProtocolInbound-" + getProtocolName() + getAgent().getId())
.process(exchange -> {
final Protocol<?> protocolInstance = exchange.getIn().getHeader(ACTUATOR_TOPIC_TARGET_PROTOCOL, Protocol.class);
if (protocolInstance != AbstractProtocol.this) {
return;
}
synchronized (this) {
AttributeEvent event = exchange.getIn().getBody(AttributeEvent.class);
Attribute<?> linkedAttribute = getLinkedAttributes().get(event.getAttributeRef());

if (linkedAttribute == null) {
LOG.log(System.Logger.Level.INFO, () -> "Attempt to write to attribute that is not actually linked to this protocol '" + AbstractProtocol.this + "': " + event);
return;
}

processLinkedAttributeWrite(linkedAttribute, event);
}
});
}
});

doStart(container);

} catch (Exception ex) {
throw new RuntimeException(ex);
}
this.producerTemplate = container.getService(MessageBrokerService.class).getProducerTemplate();
doStart(container);
}

@Override
@@ -170,26 +141,29 @@ public Map<AttributeRef, Attribute<?>> getLinkedAttributes() {
return linkedAttributes;
}

final protected void processLinkedAttributeWrite(Attribute<?> attribute, AttributeEvent event) {
LOG.log(System.Logger.Level.TRACE, () -> "Processing linked attribute write on protocol '" + this + "': " + event);
AgentLink<?> agentLink = agent.getAgentLink(attribute);

Pair<Boolean, Object> ignoreAndConverted = ProtocolUtil.doOutboundValueProcessing(
event.getAssetId(),
attribute,
agentLink,
event.getValue().orElse(null),
dynamicAttributes.contains(event.getAttributeRef()));

if (ignoreAndConverted.key) {
LOG.log(System.Logger.Level.DEBUG, "Value conversion returned ignore so attribute will not write to protocol: " + event.getAttributeRef());
return;
}
@Override
public void processLinkedAttributeWrite(AttributeEvent event) {
synchronized (processorLock) {
LOG.log(System.Logger.Level.TRACE, () -> "Processing linked attribute write on protocol '" + this + "': " + event);
AgentLink<?> agentLink = agent.getAgentLink(event);

Pair<Boolean, Object> ignoreAndConverted = ProtocolUtil.doOutboundValueProcessing(
event.getId(),
event,
agentLink,
event.getValue().orElse(null),
dynamicAttributes.contains(event.getRef()));

if (ignoreAndConverted.key) {
LOG.log(System.Logger.Level.DEBUG, "Value conversion returned ignore so attribute will not write to protocol: " + event.getRef());
return;
}

doLinkedAttributeWrite(attribute, agent.getAgentLink(attribute), event, ignoreAndConverted.value);
doLinkedAttributeWrite(agent.getAgentLink(event), event, ignoreAndConverted.value);

if (agent.isUpdateOnWrite().orElse(false) || agentLink.getUpdateOnWrite().orElse(false)) {
updateLinkedAttribute(new AttributeState(event.getAttributeRef(), ignoreAndConverted.value));
if (agent.isUpdateOnWrite().orElse(false) || agentLink.getUpdateOnWrite().orElse(false)) {
updateLinkedAttribute(new AttributeState(event.getRef(), ignoreAndConverted.value));
}
}
}

@@ -208,18 +182,16 @@ final protected void sendAttributeEvent(AttributeState state) {
*/
final protected void sendAttributeEvent(AttributeEvent event) {
// Don't allow updating linked attributes with this mechanism as it could cause an infinite loop
if (linkedAttributes.containsKey(event.getAttributeRef())) {
if (linkedAttributes.containsKey(event.getRef())) {
LOG.log(System.Logger.Level.WARNING, () -> "Cannot update an attribute linked to the same protocol; use updateLinkedAttribute for that: " + event);
return;
}
assetService.sendAttributeEvent(event);
}

/**
* Update the value of a linked attribute. Call this to publish new sensor values. This will call
* {@link ProtocolUtil#doInboundValueProcessing} before sending on the sensor queue.
*/
final protected void updateLinkedAttribute(final AttributeState state, long timestamp) {

@Override
final public void updateLinkedAttribute(final AttributeState state, long timestamp) {
Attribute<?> attribute = linkedAttributes.get(state.getRef());

if (attribute == null) {
@@ -236,30 +208,21 @@ final protected void updateLinkedAttribute(final AttributeState state, long time

AttributeEvent attributeEvent = new AttributeEvent(new AttributeState(state.getRef(), ignoreAndConverted.value), timestamp);
LOG.log(System.Logger.Level.TRACE, () -> "Sending linked attribute update: " + attributeEvent);
producerTemplate.sendBodyAndHeader(SENSOR_QUEUE, attributeEvent, Protocol.SENSOR_QUEUE_SOURCE_PROTOCOL, getProtocolName());
}

/**
* Update the value of one of this {@link Protocol}s linked {@link Agent}'s {@link Attribute}s.
*/
final protected void updateAgentAttribute(final AttributeState state) {
if (!agent.getAttributes().has(state.getRef().getName()) || !agent.getId().equals(state.getRef().getId())) {
LOG.log(System.Logger.Level.WARNING, () -> "Attempt to update non existent agent attribute or agent ID is incorrect: " + state);
return;
}
AttributeEvent attributeEvent = new AttributeEvent(state, timerService.getCurrentTimeMillis());
LOG.log(System.Logger.Level.TRACE, () -> "Sending protocol agent attribute update: " + attributeEvent);
assetService.sendAttributeEvent(attributeEvent);
}

/**
* Update the value of a linked attribute, with the current system time as event time see
* {@link #updateLinkedAttribute(AttributeState, long)} for more details.
*/
final protected void updateLinkedAttribute(AttributeState state) {
@Override
final public void updateLinkedAttribute(final AttributeState state) {
updateLinkedAttribute(state, timerService.getCurrentTimeMillis());
}

@Override
public boolean onAgentAttributeChanged(AttributeEvent event) {
// If event is for an agent attribute then we can try and handle it here in a generic way
Agent<?,?,?> agent = getAgent();
return agent.isConfigurationAttribute(event.getName());
}

/**
* Start this protocol instance
*/
@@ -291,107 +254,5 @@ public String toString() {
* (see {@link ProtocolUtil#doOutboundValueProcessing}). Protocol implementations should generally use the
* processedValue but may also choose to use the original value for some purpose if required.
*/
abstract protected void doLinkedAttributeWrite(Attribute<?> attribute, U agentLink, AttributeEvent event, Object processedValue);

private ProtocolAssetService proxyAssetService(ProtocolAssetService protocolAssetService) {
return new ProtocolAssetService() {

@Override
public <T extends Asset<?>> T mergeAsset(T asset) {
if (TextUtil.isNullOrEmpty(asset.getRealm())) {
asset.setRealm(getAgent().getRealm());
} else if (!Objects.equals(asset.getRealm(), getAgent().getRealm())) {
Protocol.LOG.warning("Protocol attempting to merge asset into another realm: " + agent);
throw new IllegalArgumentException("Protocol attempting to merge asset into another realm");
}
return protocolAssetService.mergeAsset(asset);
}

@Override
public boolean deleteAssets(String... assetIds) {
for (String assetId: assetIds) {
Asset<?> asset = protocolAssetService.findAsset(assetId);
if (asset != null) {
if (!Objects.equals(asset.getRealm(), getAgent().getRealm())) {
Protocol.LOG.warning("Protocol attempting to delete asset from another realm: " + agent);
throw new IllegalArgumentException("Protocol attempting to delete asset from another realm");
}
}
}
return protocolAssetService.deleteAssets(assetIds);
}

@Override
public <T extends Asset<?>> T findAsset(String assetId, Class<T> assetType) {
T asset = protocolAssetService.findAsset(assetId, assetType);
if (asset != null) {
if (!Objects.equals(asset.getRealm(), getAgent().getRealm())) {
Protocol.LOG.warning("Protocol attempting to find asset from another realm: " + agent);
throw new IllegalArgumentException("Protocol attempting to find asset from another realm");
}
}
return asset;
}

@Override
public <T extends Asset<?>> T findAsset(String assetId) {
T asset = protocolAssetService.findAsset(assetId);
if (asset != null) {
if (!Objects.equals(asset.getRealm(), getAgent().getRealm())) {
Protocol.LOG.warning("Protocol attempting to find asset from another realm: " + agent);
throw new IllegalArgumentException("Protocol attempting to find asset from another realm");
}
}
return asset;
}

@Override
public List<Asset<?>> findAssets(String assetId, AssetQuery assetQuery) {
List<Asset<?>> assets = protocolAssetService.findAssets(assetId, assetQuery);
for (Asset<?> asset : assets) {
if (!Objects.equals(asset.getRealm(), getAgent().getRealm())) {
Protocol.LOG.warning("Protocol attempting to find asset from another realm: " + agent);
throw new IllegalArgumentException("Protocol attempting to find asset from another realm");
}
}
return assets;
}

@Override
public void sendAttributeEvent(AttributeEvent attributeEvent) {
if (TextUtil.isNullOrEmpty(attributeEvent.getRealm())) {
attributeEvent.setRealm(getAgent().getRealm());
} else if (!Objects.equals(attributeEvent.getRealm(), getAgent().getRealm())) {
Protocol.LOG.warning("Protocol attempting to send attribute event to another realm: " + agent);
throw new IllegalArgumentException("Protocol attempting to send attribute event to another realm");
}
protocolAssetService.sendAttributeEvent(attributeEvent);
}

@Override
public void subscribeChildAssetChange(String agentId, Consumer<PersistenceEvent<Asset<?>>> assetChangeConsumer) {
protocolAssetService.subscribeChildAssetChange(agentId, assetChangeConsumer);
}

@Override
public void unsubscribeChildAssetChange(String agentId, Consumer<PersistenceEvent<Asset<?>>> assetChangeConsumer) {
protocolAssetService.unsubscribeChildAssetChange(agentId, assetChangeConsumer);
}

@Override
public void init(Container container) throws Exception {
protocolAssetService.init(container);
}

@Override
public void start(Container container) throws Exception {
protocolAssetService.start(container);
}

@Override
public void stop(Container container) throws Exception {
protocolAssetService.stop(container);
}
};
}
abstract protected void doLinkedAttributeWrite(U agentLink, AttributeEvent event, Object processedValue);
}
Loading

0 comments on commit 531c73a

Please sign in to comment.