Skip to content

Commit

Permalink
Bug fix for JSON rules notification TRIGGER_ASSETS (fixes openremote#…
Browse files Browse the repository at this point in the history
  • Loading branch information
richturner authored Jun 23, 2023
1 parent 0ab92ec commit bd69894
Show file tree
Hide file tree
Showing 3 changed files with 62 additions and 41 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -105,7 +105,8 @@ public boolean test(AssetState<?> assetState) {

if (query.attributes != null) {
// TODO: LogicGroup AND doesn't make much sense when applying to a single asset state
if (!asAssetPredicate(timerService::getCurrentTimeMillis, query.attributes).test(Collections.singletonList(assetState))) {
Set<AssetState<?>> matches = asAssetStateMatcher(timerService::getCurrentTimeMillis, query.attributes).apply(Collections.singleton(assetState));
if (matches == null) {
return false;
}
}
Expand Down Expand Up @@ -180,17 +181,18 @@ public static Predicate<NameValueHolder<?>> asPredicate(Supplier<Long> currentMi
}

/**
* A predicate for {@link AssetState}s of an asset; the states must be related to the same asset to allow
* A function for matching {@link AssetState}s of an asset; the states must be related to the same asset to allow
* {@link LogicGroup.Operator#AND} to be applied.
* @return The matched asset states or null if there is no match
*/
@SuppressWarnings("unchecked")
public static Predicate<List<AssetState<?>>> asAssetPredicate(Supplier<Long> currentMillisProducer, LogicGroup<AttributePredicate> condition) {
public static Function<Collection<AssetState<?>>, Set<AssetState<?>>> asAssetStateMatcher(Supplier<Long> currentMillisProducer, LogicGroup<AttributePredicate> condition) {
if (groupIsEmpty(condition)) {
return as -> true;
return as -> Collections.EMPTY_SET;
}

LogicGroup.Operator operator = condition.operator == null ? LogicGroup.Operator.AND : condition.operator;
List<Predicate<List<AssetState<?>>>> assetPredicates = new ArrayList<>();
List<Function<Collection<AssetState<?>>, Set<AssetState<?>>>> assetStateMatchers = new ArrayList<>();
List<Predicate<AssetState<?>>> attributePredicates = new ArrayList<>();

if (condition.getItems().size() > 0) {
Expand Down Expand Up @@ -224,56 +226,69 @@ public static Predicate<List<AssetState<?>>> asAssetPredicate(Supplier<Long> cur
});
}

if (condition.groups != null && condition.groups.size() > 0) {
assetPredicates.addAll(
condition.groups.stream()
.map(c -> asAssetPredicate(currentMillisProducer, c)).toList()
);
}

if (operator == LogicGroup.Operator.AND) {
// All predicates must match at least one of the asset's state
assetPredicates.add(assetStates ->
attributePredicates.stream().allMatch(attributePredicate -> assetStates.stream().anyMatch(attributePredicate))
);
assetStateMatchers.add(assetStates -> {
Set<AssetState<?>> matchedAssetStates = new HashSet<>();
boolean allPredicatesMatch = attributePredicates.stream().allMatch(attributePredicate -> {
// Find the first match as an attribute predicate shouldn't match more than one asset state
return assetStates.stream().filter(attributePredicate).findFirst().map(matchedAssetState -> {
matchedAssetStates.add(matchedAssetState);
return true;
}).orElse(false);
});
return allPredicatesMatch ? matchedAssetStates : null;
});
} else {
// Any of the predicates must match at least one of the asset's state
assetPredicates.add(assetStates ->
attributePredicates.stream().anyMatch(attributePredicate -> assetStates.stream().anyMatch(attributePredicate))
);
assetStateMatchers.add(assetStates -> {
AtomicReference<AssetState<?>> firstMatch = new AtomicReference<>();
boolean anyPredicateMatch = attributePredicates.stream().anyMatch(attributePredicate -> {
// Find the first match as an attribute predicate shouldn't match more than one asset state
return assetStates.stream().filter(attributePredicate).findFirst().map(matchedAssetState -> {
firstMatch.set(matchedAssetState);
return true;
}).orElse(false);
});
return anyPredicateMatch ? Collections.singleton(firstMatch.get()) : null;
});
}

return asPredicate(assetPredicates, operator);
}

protected static boolean groupIsEmpty(LogicGroup<?> condition) {
return condition.getItems().size() == 0
&& (condition.groups == null || condition.groups.isEmpty());
}

protected static <T> Predicate<T> asPredicate(Collection<Predicate<T>> predicates, LogicGroup.Operator operator) {
return in -> {
boolean matched = false;
if (condition.groups != null && condition.groups.size() > 0) {
assetStateMatchers.addAll(
condition.groups.stream()
.map(c -> asAssetStateMatcher(currentMillisProducer, c)).toList()
);
}

for (Predicate<T> p : predicates) {
return assetStates -> {
Set<AssetState<?>> matchedStates = new HashSet<>();

if (p.test(in)) {
matched = true;
for (Function<Collection<AssetState<?>>, Set<AssetState<?>>> matcher : assetStateMatchers) {
Set<AssetState<?>> matcherMatchedStates = matcher.apply(assetStates);

if (matcherMatchedStates != null) {
// We have a match
if (operator == LogicGroup.Operator.OR) {
break;
return matcherMatchedStates;
}
matchedStates.addAll(matcherMatchedStates);
} else {
matched = false;

// No match
if (operator == LogicGroup.Operator.AND) {
break;
return null;
}
}
}

return matched;
return operator == LogicGroup.Operator.OR ? null : matchedStates;
};
}

protected static boolean groupIsEmpty(LogicGroup<?> condition) {
return condition.getItems().size() == 0
&& (condition.groups == null || condition.groups.isEmpty());
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -55,6 +55,7 @@
import java.util.concurrent.atomic.AtomicLong;
import java.util.concurrent.atomic.AtomicReference;
import java.util.function.BiConsumer;
import java.util.function.Function;
import java.util.function.Predicate;
import java.util.function.Supplier;
import java.util.logging.Level;
Expand Down Expand Up @@ -90,7 +91,7 @@ static class RuleConditionState {
AssetQuery.OrderBy orderBy;
int limit;
LogicGroup<AttributePredicate> attributePredicates = null;
Predicate<List<AssetState<?>>> assetPredicate = null;
Function<Collection<AssetState<?>>, Set<AssetState<?>>> assetPredicate = null;
Set<AssetState<?>> unfilteredAssetStates = new HashSet<>();
Set<AssetState<?>> previouslyMatchedAssetStates = new HashSet<>();
Set<AssetState<?>> previouslyUnmatchedAssetStates;
Expand Down Expand Up @@ -199,7 +200,7 @@ public RuleConditionState(RuleCondition ruleCondition, boolean trackUnmatched, T
// Only supports a single level or logic group for attributes (i.e. cannot nest groups in the UI so
// don't support it here either)
attributePredicates.groups = null;
assetPredicate = AssetQueryPredicate.asAssetPredicate(timerService::getCurrentTimeMillis, attributePredicates);
assetPredicate = AssetQueryPredicate.asAssetStateMatcher(timerService::getCurrentTimeMillis, attributePredicates);
}
ruleCondition.assets.orderBy = null;
ruleCondition.assets.limit = 0;
Expand Down Expand Up @@ -285,8 +286,10 @@ void update(Map<String, Long> nextRecurAssetIdMap) {
results.put(false, unmatched);

unfilteredAssetStates.stream().collect(Collectors.groupingBy(AssetState::getId)).forEach((id, states) -> {
if (assetPredicate.test(states)) {
matched.addAll(states);
Set<AssetState<?>> matches = assetPredicate.apply(states);
if (matches != null) {
matched.addAll(matches);
unmatched.addAll(states.stream().filter(matches::contains).collect(Collectors.toSet()));
} else {
unmatched.addAll(states);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@ import org.openremote.manager.setup.SetupService
import org.openremote.manager.webhook.WebhookService
import org.openremote.model.asset.Asset
import org.openremote.model.asset.UserAssetLink
import org.openremote.model.asset.impl.ConsoleAsset
import org.openremote.model.asset.impl.ThingAsset
import org.openremote.model.attribute.Attribute
import org.openremote.model.attribute.AttributeEvent
Expand Down Expand Up @@ -270,9 +271,11 @@ class JsonRulesTest extends Specification implements ManagerContainerTrait {
["manager"] as String[])
consoleRegistration2 = authenticatedConsoleResource2.register(null, consoleRegistration2)

and: "the console location is marked as RULE_STATE"
and: "the console attributes are marked as RULE_STATE (inc LOCATION which is needed to trigger the rule)"
def asset = assetStorageService.find(consoleRegistration.id)
asset.getAttribute(Asset.LOCATION).ifPresent { it.addMeta(new MetaItem<>(MetaItemType.RULE_STATE))}
asset.getAttribute(ConsoleAsset.CONSOLE_NAME).ifPresent { it.addMeta(new MetaItem<>(MetaItemType.RULE_STATE))}
asset.getAttribute(ConsoleAsset.CONSOLE_VERSION).ifPresent { it.addMeta(new MetaItem<>(MetaItemType.RULE_STATE))}
asset = assetStorageService.merge(asset)

then: "a geofence refresh notification should have been sent to the console"
Expand Down Expand Up @@ -342,7 +345,7 @@ class JsonRulesTest extends Specification implements ManagerContainerTrait {
assert pushTargetsAndMessages.count {it.v2.title == "Linked user test 2" && it.v1.type == Notification.TargetType.ASSET && it.v1.id == consoleRegistration2.id} == 1
}

and: "an email notification should have been sent to test@openremote.io with the triggered asset in the body"
and: "an email notification should have been sent to test@openremote.io with the triggered asset in the body but only containing the triggered asset states"
conditions.eventually {
assert emailMessages.any {it.getRecipients(jakarta.mail.Message.RecipientType.TO).length == 1
&& (it.getRecipients(jakarta.mail.Message.RecipientType.TO)[0] as InternetAddress).address == "test@openremote.io"
Expand Down

0 comments on commit bd69894

Please sign in to comment.