Skip to content

Commit

Permalink
Improved push notification target filtering and made NotificationFaca…
Browse files Browse the repository at this point in the history
…de async (openremote#1102)
  • Loading branch information
richturner authored Aug 8, 2023
1 parent f55d411 commit 9a95ad4
Show file tree
Hide file tree
Showing 5 changed files with 118 additions and 143 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -880,13 +880,11 @@ public boolean delete(List<String> assetIds, boolean skipGatewayCheck) {
return false;
});

//noinspection ConstantConditions
if (gatewayIdAssetIdMap.isEmpty() && ids.isEmpty()) {
return true;
}

// This is not atomic across gateways
//noinspection ConstantConditions
if (!gatewayIdAssetIdMap.isEmpty()) {
for (Map.Entry<String, List<String>> gatewayIdAssetIds : gatewayIdAssetIdMap.entrySet()) {
String gatewayId = gatewayIdAssetIds.getKey();
Expand Down Expand Up @@ -1416,23 +1414,21 @@ protected void publishAttributeEvent(Asset<?> asset, Attribute<?> attribute) {
protected void publishModificationEvents(PersistenceEvent<Asset<?>> persistenceEvent) {
Asset<?> asset = persistenceEvent.getEntity();
switch (persistenceEvent.getCause()) {
case CREATE:
case CREATE -> {
// Fully load the asset
Asset<?> loadedAsset = find(new AssetQuery().ids(asset.getId()));

if (loadedAsset == null) {
return;
}

if (LOG.isLoggable(Level.FINEST)) {
LOG.finest("Asset created: " + loadedAsset.toStringAll());
} else {
LOG.fine("Asset created: " + loadedAsset);
}

clientEventService.publishEvent(
new AssetEvent(AssetEvent.Cause.CREATE, loadedAsset, null)
);
}

// // Raise attribute event for each attribute
// asset.getAttributes().forEach(newAttribute ->
Expand All @@ -1443,9 +1439,7 @@ protected void publishModificationEvents(PersistenceEvent<Asset<?>> persistenceE
// newAttribute.getTimestamp().orElse(timerService.getCurrentTimeMillis()))
// .setParentId(asset.getParentId()).setRealm(asset.getRealm())
// ));
break;
case UPDATE:

case UPDATE -> {
boolean attributesChanged = persistenceEvent.hasPropertyChanged("attributes");

// String[] updatedProperties = Arrays.stream(persistenceEvent.getPropertyNames()).filter(propertyName -> {
Expand All @@ -1455,14 +1449,11 @@ protected void publishModificationEvents(PersistenceEvent<Asset<?>> persistenceE
// }).toArray(String[]::new);

// Fully load the asset
loadedAsset = find(new AssetQuery().ids(asset.getId()));

Asset<?> loadedAsset = find(new AssetQuery().ids(asset.getId()));
if (loadedAsset == null) {
return;
}

LOG.finest("Asset updated: " + persistenceEvent);

clientEventService.publishEvent(
new AssetEvent(AssetEvent.Cause.UPDATE, loadedAsset, persistenceEvent.getPropertyNames().toArray(String[]::new))
);
Expand Down Expand Up @@ -1490,15 +1481,13 @@ protected void publishModificationEvents(PersistenceEvent<Asset<?>> persistenceE
publishAttributeEvent(asset, newOrModifiedAttribute)
);
}
break;
case DELETE:

}
case DELETE -> {
if (LOG.isLoggable(Level.FINEST)) {
LOG.finest("Asset deleted: " + asset.toStringAll());
} else {
LOG.fine("Asset deleted: " + asset);
}

clientEventService.publishEvent(
new AssetEvent(AssetEvent.Cause.DELETE, asset, null)
);
Expand All @@ -1509,7 +1498,7 @@ protected void publishModificationEvents(PersistenceEvent<Asset<?>> persistenceE
clientEventService.publishEvent(
AttributeEvent.deletedAttribute(asset.getId(), obsoleteAttribute.getName())
));
break;
}
}
}

Expand Down Expand Up @@ -1645,21 +1634,11 @@ protected static String buildOrderByString(AssetQuery query) {
sb.append(" order by ");

switch (query.orderBy.property) {
case CREATED_ON:
sb.append(" A.CREATED_ON ");
break;
case ASSET_TYPE:
sb.append(" A.TYPE ");
break;
case NAME:
sb.append(" A.NAME ");
break;
case PARENT_ID:
sb.append(" A.PARENT_ID ");
break;
case REALM:
sb.append(" A.REALM ");
break;
case CREATED_ON -> sb.append(" A.CREATED_ON ");
case ASSET_TYPE -> sb.append(" A.TYPE ");
case NAME -> sb.append(" A.NAME ");
case PARENT_ID -> sb.append(" A.PARENT_ID ");
case REALM -> sb.append(" A.REALM ");
}
sb.append(query.orderBy.descending ? "desc " : "asc ");
}
Expand All @@ -1674,7 +1653,6 @@ protected static String buildLimitString(AssetQuery query) {
return "";
}

@SuppressWarnings("unchecked")
protected static boolean appendWhereClause(StringBuilder sb, AssetQuery query, int level, List<ParameterBinder> binders, Supplier<Long> timeProvider) {
// level = 1 is main query
// level = 2 is union
Expand Down Expand Up @@ -1864,8 +1842,7 @@ protected static boolean addNameValuePredicates(List<? extends NameValuePredicat

sb.append(buildNameValuePredicateFilter(nameValuePredicate, jsonObjName, binders, timeProvider));

if (nameValuePredicate instanceof AttributePredicate) {
AttributePredicate attributePredicate = (AttributePredicate)nameValuePredicate;
if (nameValuePredicate instanceof AttributePredicate attributePredicate) {

if (attributePredicate.meta != null && attributePredicate.meta.length > 0) {
String metaJsonObjName = jsonObjName + "_AM" + metaIndex++;
Expand Down Expand Up @@ -1919,46 +1896,52 @@ protected static String buildNameValuePredicateFilter(NameValuePredicate nameVal
// Inserts the SQL string and adds the parameters
BiConsumer<StringBuilder, List<ParameterBinder>> valuePathInserter;
boolean isAttributePredicate = nameValuePredicate instanceof AttributePredicate;
boolean isTextCompare = (nameValuePredicate.value instanceof ValueEmptyPredicate) || (nameValuePredicate.value instanceof StringPredicate);
final String operator = isTextCompare ? "#>>" : "#>";

if (nameValuePredicate.path == null || nameValuePredicate.path.getPaths().length == 0) {
valuePathInserter = (sb, b) ->
sb.append(isAttributePredicate ? "(" + jsonObjName + ".VALUE #> '{value}')" : jsonObjName + ".VALUE");
valuePathInserter = (sb, b) -> {
if (isAttributePredicate) {
sb.append("(").append(jsonObjName).append(".VALUE ").append(operator).append(" '{value}')");
} else {
sb.append(jsonObjName).append(".VALUE");
if (isTextCompare) {
sb.append(" #>> '{}'");
}
}
};
} else {
List<String> paths = new ArrayList<>();
if (isAttributePredicate) {
paths.add("value");
}
paths.addAll(Arrays.stream(nameValuePredicate.path.getPaths()).map(Object::toString).collect(Collectors.toList()));
paths.addAll(Arrays.stream(nameValuePredicate.path.getPaths()).map(Object::toString).toList());

valuePathInserter = (sb, b) -> {
final int pos = binders.size() + 1;
sb.append("(").append(jsonObjName).append(".VALUE #> ?").append(pos).append(")");
sb.append("(").append(jsonObjName).append(".VALUE ").append(operator).append(" ?").append(pos).append(")");
binders.add((em, st) -> st.setParameter(pos, paths.toArray(new String[0]), StringArrayType.INSTANCE));
};
}

if (nameValuePredicate.value instanceof StringPredicate) {
StringPredicate stringPredicate = (StringPredicate) nameValuePredicate.value;
if (nameValuePredicate.value instanceof StringPredicate stringPredicate) {
if (!stringPredicate.caseSensitive) {
attributeBuilder.append("upper(");
}
valuePathInserter.accept(attributeBuilder, binders);
attributeBuilder.append(" #>> '{}'");
if (!stringPredicate.caseSensitive) {
attributeBuilder.append(")");
}
final int pos = binders.size() + 1;
attributeBuilder.append(StringPredicate.toSQLParameter(stringPredicate, pos, false));
binders.add((em, st) -> st.setParameter(pos, stringPredicate.prepareValue()));
} else if (nameValuePredicate.value instanceof BooleanPredicate) {
BooleanPredicate booleanPredicate = (BooleanPredicate) nameValuePredicate.value;
} else if (nameValuePredicate.value instanceof BooleanPredicate booleanPredicate) {
valuePathInserter.accept(attributeBuilder, binders);
attributeBuilder
.append(" = to_jsonb(")
.append(booleanPredicate.value)
.append(")");
} else if (nameValuePredicate.value instanceof DateTimePredicate) {
DateTimePredicate dateTimePredicate = (DateTimePredicate) nameValuePredicate.value;
} else if (nameValuePredicate.value instanceof DateTimePredicate dateTimePredicate) {
attributeBuilder.append("(");
valuePathInserter.accept(attributeBuilder, binders);
attributeBuilder
Expand All @@ -1974,8 +1957,7 @@ protected static String buildNameValuePredicateFilter(NameValuePredicate nameVal
final int pos2 = binders.size() + 1;
binders.add((em, st) -> st.setParameter(pos2, new java.sql.Timestamp(fromAndTo.value != null ? fromAndTo.value : Long.MAX_VALUE)));
}
} else if (nameValuePredicate.value instanceof NumberPredicate) {
NumberPredicate numberPredicate = (NumberPredicate) nameValuePredicate.value;
} else if (nameValuePredicate.value instanceof NumberPredicate numberPredicate) {
attributeBuilder.append("(");
valuePathInserter.accept(attributeBuilder, binders);
attributeBuilder
Expand All @@ -1987,8 +1969,7 @@ protected static String buildNameValuePredicateFilter(NameValuePredicate nameVal
final int pos2 = binders.size() + 1;
binders.add((em, st) -> st.setParameter(pos2, numberPredicate.rangeValue));
}
} else if (nameValuePredicate.value instanceof ArrayPredicate) {
ArrayPredicate arrayPredicate = (ArrayPredicate) nameValuePredicate.value;
} else if (nameValuePredicate.value instanceof ArrayPredicate arrayPredicate) {
if (arrayPredicate.negated) {
attributeBuilder.append("NOT(");
}
Expand Down Expand Up @@ -2032,8 +2013,7 @@ protected static String buildNameValuePredicateFilter(NameValuePredicate nameVal
attributeBuilder.append(")");
}
} else if (nameValuePredicate.value instanceof GeofencePredicate) {
if (nameValuePredicate.value instanceof RadialGeofencePredicate) {
RadialGeofencePredicate location = (RadialGeofencePredicate) nameValuePredicate.value;
if (nameValuePredicate.value instanceof RadialGeofencePredicate location) {
attributeBuilder.append("ST_DistanceSphere(ST_MakePoint((");
valuePathInserter.accept(attributeBuilder, binders);
attributeBuilder
Expand All @@ -2048,8 +2028,7 @@ protected static String buildNameValuePredicateFilter(NameValuePredicate nameVal
.append(location.lat)
.append(location.negated ? ")) > " : ")) <= ")
.append(location.radius);
} else if (nameValuePredicate.value instanceof RectangularGeofencePredicate) {
RectangularGeofencePredicate location = (RectangularGeofencePredicate) nameValuePredicate.value;
} else if (nameValuePredicate.value instanceof RectangularGeofencePredicate location) {
if (location.negated) {
attributeBuilder.append("NOT");
}
Expand All @@ -2073,8 +2052,9 @@ protected static String buildNameValuePredicateFilter(NameValuePredicate nameVal
.append("))");
}
} else if (nameValuePredicate.value instanceof ValueEmptyPredicate) {
// Two situations - key is present and not null (cannot use IS NULL for this) or key is not present at all (have to use IS NULL for this)
valuePathInserter.accept(attributeBuilder, binders);
attributeBuilder.append(((ValueEmptyPredicate) nameValuePredicate.value).negate ? "\\:\\:text IS NOT NULL" : "\\:\\:text IS NULL");
attributeBuilder.append(((ValueEmptyPredicate) nameValuePredicate.value).negate ? " IS NOT NULL" : " IS NULL");
} else if (nameValuePredicate.value instanceof CalendarEventPredicate) {
final int pos = binders.size() + 1;
java.sql.Timestamp when = new java.sql.Timestamp(((CalendarEventPredicate)nameValuePredicate.value).timestamp.getTime());
Expand Down Expand Up @@ -2118,36 +2098,42 @@ protected static String buildNameValuePredicateFilter(NameValuePredicate nameVal

protected static String buildOperatorFilter(AssetQuery.Operator operator, boolean negate, int pos) {
switch (operator) {
case EQUALS:
case EQUALS -> {
if (negate) {
return " <> ?" + pos + " ";
}
return " = ?" + pos + " ";
case GREATER_THAN:
}
case GREATER_THAN -> {
if (negate) {
return " <= ?" + pos + " ";
}
return " > ?" + pos + " ";
case GREATER_EQUALS:
}
case GREATER_EQUALS -> {
if (negate) {
return " < ?" + pos + " ";
}
return " >= ?" + pos + " ";
case LESS_THAN:
}
case LESS_THAN -> {
if (negate) {
return " >= ?" + pos + " ";
}
return " < ?" + pos + " ";
case LESS_EQUALS:
}
case LESS_EQUALS -> {
if (negate) {
return " > ?" + pos + " ";
}
return " <= ?" + pos + " ";
case BETWEEN:
}
case BETWEEN -> {
if (negate) {
return " NOT BETWEEN ?" + pos + " AND ?" + (pos+1) + " ";
return " NOT BETWEEN ?" + pos + " AND ?" + (pos + 1) + " ";
}
return " BETWEEN ?" + pos + " AND ?" + (pos+1) + " ";
return " BETWEEN ?" + pos + " AND ?" + (pos + 1) + " ";
}
}

throw new IllegalArgumentException("Unsupported operator: " + operator);
Expand Down
Loading

0 comments on commit 9a95ad4

Please sign in to comment.