Skip to content

Commit

Permalink
SCADA-LTS#2854 Safely starting and stopping applications:
Browse files Browse the repository at this point in the history
- corrected sequence starting and stopping Data Points;
- blocking the writing of values before data points are initialized;
- added tests: StartStopDataPointsUtilsTestsSuite
  • Loading branch information
Limraj committed Apr 19, 2024
1 parent 1c001d7 commit b2f7048
Show file tree
Hide file tree
Showing 16 changed files with 747 additions and 92 deletions.
1 change: 1 addition & 0 deletions build.gradle
Original file line number Diff line number Diff line change
Expand Up @@ -219,5 +219,6 @@ test {
includeTestsMatching "org.scada_lts.serorepl.utils.StringUtilsTestsSuite"
includeTestsMatching "org.scada_lts.monitor.ConcurrentMonitoredValuesTest"
includeTestsMatching "com.serotonin.mango.util.AddLimitIfWithoutSqlDataSourceUtilsTest"
includeTestsMatching "com.serotonin.mango.util.StartStopDataPointsUtilsTestsSuite"
}
}
104 changes: 24 additions & 80 deletions src/com/serotonin/mango/rt/RuntimeManager.java
Original file line number Diff line number Diff line change
Expand Up @@ -21,21 +21,17 @@
import java.util.*;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.CopyOnWriteArrayList;
import java.util.stream.Collectors;

import com.serotonin.db.IntValuePair;
import com.serotonin.mango.db.dao.*;
import com.serotonin.mango.rt.dataImage.*;
import com.serotonin.mango.rt.dataSource.PollingDataSource;
import com.serotonin.mango.rt.event.*;
import com.serotonin.mango.rt.event.schedule.ResetDailyLimitSendingEventRT;
import com.serotonin.mango.rt.event.schedule.ScheduledExecuteInactiveEventRT;
import com.serotonin.mango.util.LoggingUtils;
import com.serotonin.mango.util.StartStopDataPointsUtils;
import com.serotonin.mango.view.event.NoneEventRenderer;
import com.serotonin.mango.vo.User;
import com.serotonin.mango.vo.dataSource.PointLocatorVO;
import com.serotonin.mango.vo.dataSource.http.ICheckReactivation;
import com.serotonin.mango.vo.dataSource.meta.MetaPointLocatorVO;
import com.serotonin.mango.vo.mailingList.MailingList;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
Expand Down Expand Up @@ -123,11 +119,12 @@ public class RuntimeManager {


private volatile boolean started = false;
private volatile boolean startedDataPoints = false;

//
// Lifecycle
synchronized public void initialize(boolean safe) {
if (started)
if (started || startedDataPoints)
throw new ShouldNeverHappenException(
"RuntimeManager already started");

Expand Down Expand Up @@ -271,6 +268,8 @@ synchronized public void terminate() {
for (PointLinkRT pointLink : pointLinks)
stopPointLink(pointLink.getId());

stopPoints();

markAsTerminatingAll();

// First stop meta data sources.
Expand Down Expand Up @@ -398,9 +397,6 @@ public void stopDataSource(int id) {
DataSourceRT dataSource = getRunningDataSource(id);
if (dataSource == null)
return;
if(dataSource instanceof PollingDataSource) {
((PollingDataSource)dataSource).markAsTerminating();
}
// Stop the data points.
for (DataPointRT p : dataPoints.values()) {
if (p.getDataSourceId() == id)
Expand All @@ -416,11 +412,7 @@ public void stopDataSource(int id) {
}

public void markAsTerminatingAll() {
for(DataSourceRT dataSource: runningDataSources) {
if(dataSource instanceof PollingDataSource) {
((PollingDataSource)dataSource).markAsTerminating();
}
}
PollingDataSource.markAsTerminating();
}

//
Expand Down Expand Up @@ -518,7 +510,7 @@ private void startDataPointSafe(DataPointVO vo) {
try {
startDataPoint(vo);
} catch (Exception ex) {
LOG.error(ex.getMessage() + ", dataPoint: " + vo.getName() + "(id: " + vo.getId() + ", xid: " + vo.getXid() + "), dataSource: " + vo.getDeviceName() + "(xid: " + vo.getDataSourceXid() + ") : ", ex);
LOG.error(ex.getMessage() + " - " + LoggingUtils.dataPointInfo(vo) + " : ", ex);
stopDataPointSafe(vo.getId());
}
}
Expand All @@ -536,10 +528,16 @@ private void stopDataPoint(int dataPointId) {
if (l != null)
l.pointTerminated();
p.terminate();
DataPointVO point = p.getVO();
LOG.info("Data point '" + point.getExtendedName() + "' stopped");
}
}
}

private void stopDataPointSafe(DataPointVO dataPoint) {
stopDataPointSafe(dataPoint.getId());
}

private void stopDataPointSafe(int dataPointId) {
try {
stopDataPoint(dataPointId);
Expand Down Expand Up @@ -1078,75 +1076,21 @@ public boolean isStarted() {
return started;
}

private void startPoints() {

DataPointService dataPointService = new DataPointService();
List<DataPointVO> allPoints = dataPointService.getDataPoints(null, true);
List<DataPointVO> nonMetaDataPoints = allPoints.stream()
.filter(a -> !(a.getPointLocator() instanceof MetaPointLocatorVO))
.collect(Collectors.toList());
List<DataPointVO> metaDataPoints = allPoints.stream()
.filter(a -> a.getPointLocator() instanceof MetaPointLocatorVO)
.collect(Collectors.toList());

run(nonMetaDataPoints);

List<DataPointVO> toRunning = new ArrayList<>();
Set<Integer> toCheck = new HashSet<>();
int safe = 10;
for(DataPointVO dataPoint: metaDataPoints) {
collectMetaDataPointsFromContext(toCheck, toRunning, dataPoint, safe, allPoints);
}

run(toRunning);
run(metaDataPoints);
public boolean isStartedDataPoints() {
return startedDataPoints;
}

private void run(List<DataPointVO> dataPoints) {
for(DataPointVO dataPoint: dataPoints) {
if(dataPoint.isEnabled()) {
DataPointRT dataPointRT = getDataPoint(dataPoint.getId());
if(dataPointRT == null) {
startDataPointSafe(dataPoint);
}
}
private void startPoints() {
try {
DataPointService dataPointService = new DataPointService();
StartStopDataPointsUtils.startPoints(dataPointService, this::startDataPointSafe, this::getDataPoint, this::getRunningDataSource);
} finally {
this.startedDataPoints = true;
}
}

private void collectMetaDataPointsFromContext(Set<Integer> toCheck, List<DataPointVO> toRunning,
DataPointVO dataPoint, int safe,
List<DataPointVO> allPoints) {
if(safe < 0) {
LOG.error("Recursion level exceeded: " + LoggingUtils.dataPointInfo(dataPoint));
return;
}
if(dataPoint.isEnabled()) {
PointLocatorVO pointLocator = dataPoint.getPointLocator();
if(pointLocator instanceof MetaPointLocatorVO) {
MetaPointLocatorVO metaPointLocator = (MetaPointLocatorVO) pointLocator;
if (metaPointLocator.getContext() != null && !metaPointLocator.getContext().isEmpty()) {
for (IntValuePair intValuePair : metaPointLocator.getContext()) {
if(intValuePair.getKey() > 0) {
DataPointRT dataPointRT = getDataPoint(intValuePair.getKey());
if (dataPointRT == null) {
DataPointVO fromContextDataPoint = allPoints.stream()
.filter(a -> a.getId() == intValuePair.getKey())
.findAny()
.orElse(null);
if (fromContextDataPoint != null) {
if (fromContextDataPoint.getPointLocator() instanceof MetaPointLocatorVO) {
collectMetaDataPointsFromContext(toCheck, toRunning, fromContextDataPoint, --safe, allPoints);
}
if (!toCheck.contains(fromContextDataPoint.getId())) {
toCheck.add(fromContextDataPoint.getId());
toRunning.add(fromContextDataPoint);
}
}
}
}
}
}
}
}
private void stopPoints() {
this.startedDataPoints = false;
StartStopDataPointsUtils.stopPoints(this.dataPoints.values(), this::stopDataPointSafe, this::getDataPoint);
}
}
7 changes: 5 additions & 2 deletions src/com/serotonin/mango/rt/dataImage/DataPointNonSyncRT.java
Original file line number Diff line number Diff line change
Expand Up @@ -34,7 +34,6 @@
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.scada_lts.dao.SystemSettingsDAO;
import org.scada_lts.utils.PointValueStateUtils;

import java.util.ArrayList;
import java.util.List;
Expand Down Expand Up @@ -67,7 +66,11 @@ public DataPointNonSyncRT(DataPointVO vo, PointLocatorRT pointLocator) {

@Override
protected void savePointValue(PointValueTime newValue, SetPointSource source,
boolean async) {
boolean async) {
if(isBlocked()) {
return;
}

// Null values are not very nice, and since they don't have a specific
// meaning they are hereby ignored.
if (newValue == null)
Expand Down
8 changes: 6 additions & 2 deletions src/com/serotonin/mango/rt/dataImage/DataPointRT.java
Original file line number Diff line number Diff line change
Expand Up @@ -52,7 +52,7 @@

import static org.scada_lts.utils.PointValueStateUtils.isSetPoint;

public class DataPointRT implements IDataPoint, ILifecycle, TimeoutClient, ScadaWebSockets<MangoValue> {
public class DataPointRT implements IDataPointRT, ILifecycle, TimeoutClient, ScadaWebSockets<MangoValue> {
private static final Log LOG = LogFactory.getLog(DataPointRT.class);
private static final PvtTimeComparator pvtTimeComparator = new PvtTimeComparator();

Expand Down Expand Up @@ -197,7 +197,11 @@ public void setPointValue(PointValueTime newValue, SetPointSource source) {
}

protected void savePointValue(PointValueTime newValue, SetPointSource source,
boolean async) {
boolean async) {
if(isBlocked()) {
return;
}

// Null values are not very nice, and since they don't have a specific
// meaning they are hereby ignored.
if (newValue == null)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -49,7 +49,11 @@ public DataPointSynchronizedRT(DataPointVO vo, PointLocatorRT pointLocator) {

@Override
protected void savePointValue(PointValueTime newValue, SetPointSource source,
boolean async) {
boolean async) {
if(isBlocked()) {
return;
}

// Null values are not very nice, and since they don't have a specific
// meaning they are hereby ignored.
if (newValue == null)
Expand Down
5 changes: 5 additions & 0 deletions src/com/serotonin/mango/rt/dataImage/IDataPointRT.java
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
package com.serotonin.mango.rt.dataImage;

import com.serotonin.mango.Common;
import com.serotonin.mango.rt.dataImage.types.MangoValue;
import com.serotonin.mango.rt.dataSource.PointLocatorRT;
import com.serotonin.mango.util.timeout.TimeoutClient;
Expand All @@ -26,4 +27,8 @@ public interface IDataPointRT extends IDataPoint, ILifecycle, TimeoutClient,
void initialize();
@Override
void terminate();

default boolean isBlocked() {
return !Common.ctx.getRuntimeManager().isStartedDataPoints();
}
}
6 changes: 3 additions & 3 deletions src/com/serotonin/mango/rt/dataSource/PollingDataSource.java
Original file line number Diff line number Diff line change
Expand Up @@ -47,7 +47,7 @@ abstract public class PollingDataSource extends DataSourceRT implements TimeoutC
private TimerTask timerTask;
private volatile Thread jobThread;
private long jobThreadStartTime;
private volatile boolean markAsTerminating = false;
private static volatile boolean markAsTerminating = false;

public PollingDataSource(DataSourceVO<?> vo) {
super(vo);
Expand Down Expand Up @@ -153,7 +153,7 @@ public boolean isMarkAsTerminating() {
}
}

public void markAsTerminating() {
this.markAsTerminating = true;
public static void markAsTerminating() {
markAsTerminating = true;
}
}
6 changes: 3 additions & 3 deletions src/com/serotonin/mango/util/LoggingUtils.java
Original file line number Diff line number Diff line change
Expand Up @@ -32,9 +32,9 @@ private LoggingUtils() {}
public static String dataPointInfo(DataPointVO dataPoint) {
if(dataPoint == null)
return "";
String info = "datapoint: {0} (id: {1}, xid: {2}, dataSourceId: {3}, dataSourceName: {4})";
return MessageFormat.format(info, dataPoint.getName(), String.valueOf(dataPoint.getId()),
dataPoint.getXid(), dataPoint.getDataSourceId(), dataPoint.getDataSourceName());
String info = "datapoint: {0} (id: {1}, xid: {2}, dataSourceId: {3}, dataSourceXid: {4}, dataSourceName: {5})";
return MessageFormat.format(info, dataPoint.getExtendedName(), String.valueOf(dataPoint.getId()),
dataPoint.getXid(), dataPoint.getDataSourceId(), dataPoint.getDataSourceXid(), dataPoint.getDataSourceName());
}

public static String dataSourceInfo(DataSourceRT dataSource) {
Expand Down
Loading

0 comments on commit b2f7048

Please sign in to comment.