Skip to content

Commit

Permalink
Asset model updates: path now implemented using ltree and always load…
Browse files Browse the repository at this point in the history
…ed, removed parent name and parent type info

AssetQuery select exclusions removed (attributes can still be excluded using empty attributeNames array)
  • Loading branch information
richturner committed Feb 3, 2022
1 parent 0b8d44d commit 9b2265c
Show file tree
Hide file tree
Showing 55 changed files with 553 additions and 752 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,7 @@
*/
package org.openremote.agent.protocol;

import org.openremote.container.persistence.PersistenceEvent;
import org.openremote.model.PersistenceEvent;
import org.openremote.model.ContainerService;
import org.openremote.model.asset.Asset;
import org.openremote.model.asset.agent.Agent;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,8 @@
import com.zaxxer.hikari.HikariConfig;
import com.zaxxer.hikari.HikariDataSource;
import org.hibernate.cfg.AvailableSettings;
import org.hibernate.dialect.PostgreSQL95Dialect;
import org.hibernate.dialect.PostgreSQL10Dialect;
import org.openremote.container.PostgreSQL10LTreeDialect;
import org.openremote.container.concurrent.ContainerThreadFactory;

import java.util.Properties;
Expand Down Expand Up @@ -54,7 +55,7 @@ enum Product implements Database {
@Override
public Properties createProperties() {
Properties properties = new Properties();
properties.put(AvailableSettings.DIALECT, PostgreSQL95Dialect.class.getName());
properties.put(AvailableSettings.DIALECT, PostgreSQL10LTreeDialect.class.getName());
return properties;
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -26,12 +26,14 @@
import org.hibernate.Transaction;
import org.hibernate.type.Type;
import org.openremote.container.message.MessageBrokerService;
import org.openremote.model.PersistenceEvent;

import javax.transaction.Status;
import javax.transaction.Synchronization;
import java.io.Serializable;
import java.util.HashSet;
import java.util.Set;
import java.util.function.Consumer;
import java.util.logging.Level;
import java.util.logging.Logger;

Expand All @@ -41,16 +43,14 @@
public class PersistenceEventInterceptor extends EmptyInterceptor {

private static final Logger LOG = Logger.getLogger(PersistenceEventInterceptor.class.getName());

protected MessageBrokerService messageBrokerService;
protected Consumer<PersistenceEvent<?>> eventConsumer;
protected Set<PersistenceEvent<?>> persistenceEvents = new HashSet<>();

public void setMessageBrokerService(MessageBrokerService messageBrokerService) {
this.messageBrokerService = messageBrokerService;
public void setEventConsumer(Consumer<PersistenceEvent<?>> eventConsumer) {
this.eventConsumer = eventConsumer;
}

@Override
@SuppressWarnings("unchecked")
public boolean onSave(Object entity, Serializable id,
Object[] state, String[] propertyNames, Type[] types)
throws CallbackException {
Expand All @@ -64,7 +64,6 @@ public boolean onSave(Object entity, Serializable id,
}

@Override
@SuppressWarnings("unchecked")
public boolean onFlushDirty(Object entity, Serializable id,
Object[] currentState, Object[] previousState,
String[] propertyNames, Type[] types)
Expand All @@ -80,7 +79,6 @@ public boolean onFlushDirty(Object entity, Serializable id,
}

@Override
@SuppressWarnings("unchecked")
public void onDelete(Object entity, Serializable id,
Object[] state,
String[] propertyNames,
Expand All @@ -106,20 +104,14 @@ public void afterCompletion(int status) {
if (status != Status.STATUS_COMMITTED)
return;

if (messageBrokerService.getProducerTemplate() == null) {
// Message broker not started yet
if (eventConsumer == null) {
// Event consumer not set
return;
}

for (PersistenceEvent<?> persistenceEvent : persistenceEvents) {
try {
messageBrokerService.getProducerTemplate().sendBodyAndHeader(
PersistenceEvent.PERSISTENCE_TOPIC,
ExchangePattern.InOnly,
persistenceEvent,
PersistenceEvent.HEADER_ENTITY_TYPE,
persistenceEvent.getEntity().getClass()
);
eventConsumer.accept(persistenceEvent);
} catch (CamelExecutionException ex) {
// TODO Better error handling?
LOG.log(Level.SEVERE, "Error dispatching: " + persistenceEvent + " - " + ex, ex);
Expand All @@ -135,9 +127,4 @@ public void afterCompletion(int status) {
@Override
public void afterTransactionCompletion(Transaction tx) {
}

@Override
public int[] findDirty(Object entity, Serializable id, Object[] currentState, Object[] previousState, String[] propertyNames, Type[] types) {
return super.findDirty(entity, id, currentState, previousState, propertyNames, types);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,8 @@
package org.openremote.container.persistence;

import org.apache.camel.ExchangePattern;
import org.apache.camel.Predicate;
import org.apache.camel.ProducerTemplate;
import org.flywaydb.core.Flyway;
import org.flywaydb.core.api.MigrationInfo;
import org.flywaydb.core.api.output.MigrateResult;
Expand All @@ -28,10 +30,9 @@
import org.hibernate.engine.spi.SharedSessionContractImplementor;
import org.hibernate.jpa.boot.internal.EntityManagerFactoryBuilderImpl;
import org.hibernate.jpa.boot.internal.PersistenceUnitInfoDescriptor;
import org.hibernate.jpa.boot.spi.EntityManagerFactoryBuilder;
import org.openremote.container.message.MessageBrokerService;
import org.openremote.model.Container;
import org.openremote.model.ContainerService;
import org.openremote.model.EntityClassProvider;
import org.openremote.model.*;
import org.openremote.model.apps.ConsoleAppConfig;
import org.openremote.model.asset.Asset;
import org.openremote.model.asset.AssetDescriptor;
Expand Down Expand Up @@ -68,7 +69,7 @@

import static org.openremote.container.util.MapAccess.*;

public class PersistenceService implements ContainerService {
public class PersistenceService implements ContainerService, Consumer<PersistenceEvent<?>> {

/**
* Programmatic definition of OpenRemotePU for hibernate
Expand Down Expand Up @@ -182,6 +183,11 @@ public ClassLoader getNewTempClassLoader() {
}
}

// TODO: Make configurable
public static final String PERSISTENCE_TOPIC =
"seda://PersistenceTopic?multipleConsumers=true&concurrentConsumers=1&waitForTaskToComplete=NEVER&purgeWhenStopping=true&discardIfNoConsumers=true&limitConcurrentConsumers=false&size=25000";
public static final String HEADER_ENTITY_TYPE = PersistenceEvent.class.getSimpleName() + ".ENTITY_TYPE";

private static final Logger LOG = Logger.getLogger(PersistenceService.class.getName());

/**
Expand Down Expand Up @@ -221,12 +227,18 @@ public ClassLoader getNewTempClassLoader() {
protected String persistenceUnitName;
protected Properties persistenceUnitProperties;
protected EntityManagerFactory entityManagerFactory;

protected Flyway flyway;
protected boolean forceClean;
protected Set<String> defaultSchemaLocations = new HashSet<>();
protected Set<String> schemas = new HashSet<>();

public static Predicate isPersistenceEventForEntityType(Class<?> type) {
return exchange -> {
Class<?> entityType = exchange.getIn().getHeader(HEADER_ENTITY_TYPE, Class.class);
return type.isAssignableFrom(entityType);
};
}

@Override
public int getPriority() {
return PRIORITY;
Expand Down Expand Up @@ -261,13 +273,16 @@ public void init(Container container) throws Exception {

if (messageBrokerService != null) {
persistenceUnitProperties.put(
org.hibernate.cfg.AvailableSettings.SESSION_SCOPED_INTERCEPTOR,
AvailableSettings.SESSION_SCOPED_INTERCEPTOR,
PersistenceEventInterceptor.class.getName()
);
}

persistenceUnitProperties.put(AvailableSettings.DEFAULT_SCHEMA, dbSchema);

// Add custom integrator so we can register a custom flush entity event listener
persistenceUnitProperties.put(EntityManagerFactoryBuilderImpl.INTEGRATOR_PROVIDER, IntegratorProvider.class.getName());

persistenceUnitName = getString(container.getConfig(), PERSISTENCE_UNIT_NAME, PERSISTENCE_UNIT_NAME_DEFAULT);

forceClean = getBoolean(container.getConfig(), SETUP_WIPE_CLEAN_INSTALL, container.isDevMode());
Expand Down Expand Up @@ -353,7 +368,7 @@ public EntityManager createEntityManager() {
Session session = entityManager.unwrap(Session.class);
PersistenceEventInterceptor persistenceEventInterceptor =
(PersistenceEventInterceptor) ((SharedSessionContractImplementor) session).getInterceptor();
persistenceEventInterceptor.setMessageBrokerService(messageBrokerService);
persistenceEventInterceptor.setEventConsumer(this);
}

return entityManager;
Expand Down Expand Up @@ -439,10 +454,10 @@ public void publishPersistenceEvent(PersistenceEvent.Cause cause, Object entity,

if (messageBrokerService.getProducerTemplate() != null) {
messageBrokerService.getProducerTemplate().sendBodyAndHeader(
PersistenceEvent.PERSISTENCE_TOPIC,
PERSISTENCE_TOPIC,
ExchangePattern.InOnly,
persistenceEvent,
PersistenceEvent.HEADER_ENTITY_TYPE,
HEADER_ENTITY_TYPE,
persistenceEvent.getEntity().getClass()
);
}
Expand Down Expand Up @@ -502,6 +517,22 @@ protected void appendSchemas(List<String> schemas) {
schemas.addAll(this.schemas);
}

@Override
public void accept(PersistenceEvent<?> persistenceEvent) {

ProducerTemplate producerTemplate = messageBrokerService.getProducerTemplate();

if (producerTemplate != null) {
producerTemplate.sendBodyAndHeader(
PersistenceService.PERSISTENCE_TOPIC,
ExchangePattern.InOnly,
persistenceEvent,
PersistenceService.HEADER_ENTITY_TYPE,
persistenceEvent.getEntity().getClass()
);
}
}

@Override
public String toString() {
return getClass().getSimpleName() + "{" +
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,10 @@
@TypeDef(
name = EpochMillisInstantType.TYPE_NAME,
typeClass = EpochMillisInstantType.class
),
@TypeDef(
name = LTreeType.TYPE,
typeClass = LTreeType.class
)
})

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,11 +19,10 @@
*/
package org.openremote.manager.agent;

import groovy.util.MapEntry;
import org.apache.camel.builder.RouteBuilder;
import org.openremote.agent.protocol.ProtocolAssetService;
import org.openremote.container.message.MessageBrokerService;
import org.openremote.container.persistence.PersistenceEvent;
import org.openremote.model.PersistenceEvent;
import org.openremote.container.timer.TimerService;
import org.openremote.manager.asset.AssetProcessingException;
import org.openremote.manager.asset.AssetProcessingService;
Expand Down Expand Up @@ -69,8 +68,8 @@
import static java.util.stream.Collectors.toList;
import static org.openremote.container.concurrent.GlobalLock.withLock;
import static org.openremote.container.concurrent.GlobalLock.withLockReturning;
import static org.openremote.container.persistence.PersistenceEvent.PERSISTENCE_TOPIC;
import static org.openremote.container.persistence.PersistenceEvent.isPersistenceEventForEntityType;
import static org.openremote.container.persistence.PersistenceService.PERSISTENCE_TOPIC;
import static org.openremote.container.persistence.PersistenceService.isPersistenceEventForEntityType;
import static org.openremote.manager.asset.AssetProcessingService.ASSET_QUEUE;
import static org.openremote.manager.gateway.GatewayService.isNotForGateway;
import static org.openremote.model.asset.agent.Protocol.ACTUATOR_TOPIC;
Expand Down
Loading

0 comments on commit 9b2265c

Please sign in to comment.