From 1cbb114903eefa31a7c34812e34a588ae68b1ae9 Mon Sep 17 00:00:00 2001 From: Alex Karpovich Date: Mon, 3 Jun 2024 14:42:20 +0300 Subject: [PATCH] [+] support metrics --- java/build.gradle | 2 + .../qsrv/hf/tickdb/pub/mon/TBMonitor.java | 4 + .../replication/ReplicationOptions.java | 2 + .../util/vsocket/VSServerFramework.java | 5 + java/timebase/commons/build.gradle | 2 + .../deltix/qsrv/comm/cat/TomcatRunner.java | 28 +++ .../qsrv/config/QuantServiceConfig.java | 5 + .../qsrv/util/metrics/MetricsService.java | 169 ++++++++++++++++++ java/timebase/server/build.gradle | 2 + .../hf/tickdb/impl/mon/TBMonitorImpl.java | 20 +++ .../impl/topic/TopicSupportWrapper.java | 10 ++ .../web/controller/MetricsController.java | 95 ++++++++++ 12 files changed, 344 insertions(+) create mode 100644 java/timebase/commons/src/main/java/com/epam/deltix/qsrv/util/metrics/MetricsService.java create mode 100644 java/timebase/server/src/main/java/com/epam/deltix/qsrv/hf/tickdb/web/controller/MetricsController.java diff --git a/java/build.gradle b/java/build.gradle index 22ebd106..66c1aed4 100644 --- a/java/build.gradle +++ b/java/build.gradle @@ -208,6 +208,8 @@ configure(leafProjects) { // entry 'hadoop-yarn-common' // } + dependency 'io.micrometer:micrometer-registry-prometheus:1.7.3' + dependencySet(group: 'org.apache.tomcat.embed', version: '8.0.53') { entry 'tomcat-embed-core' // Tomcat core entry 'tomcat-embed-jasper' // Tomcat JSP support diff --git a/java/timebase/api/src/main/java/com/epam/deltix/qsrv/hf/tickdb/pub/mon/TBMonitor.java b/java/timebase/api/src/main/java/com/epam/deltix/qsrv/hf/tickdb/pub/mon/TBMonitor.java index 4f8fb4f9..1981d7e8 100644 --- a/java/timebase/api/src/main/java/com/epam/deltix/qsrv/hf/tickdb/pub/mon/TBMonitor.java +++ b/java/timebase/api/src/main/java/com/epam/deltix/qsrv/hf/tickdb/pub/mon/TBMonitor.java @@ -37,4 +37,8 @@ public interface TBMonitor { public void removeObjectMonitor(TBObjectMonitor monitor); public void addPropertyMonitor(String component, PropertyMonitor monitor); + + int cursorsCount(); + + int loadersCount(); } \ No newline at end of file diff --git a/java/timebase/api/src/main/java/com/epam/deltix/qsrv/hf/tickdb/replication/ReplicationOptions.java b/java/timebase/api/src/main/java/com/epam/deltix/qsrv/hf/tickdb/replication/ReplicationOptions.java index a5a7930e..ec2a0941 100644 --- a/java/timebase/api/src/main/java/com/epam/deltix/qsrv/hf/tickdb/replication/ReplicationOptions.java +++ b/java/timebase/api/src/main/java/com/epam/deltix/qsrv/hf/tickdb/replication/ReplicationOptions.java @@ -27,4 +27,6 @@ public class ReplicationOptions extends CommonOptions { public long threshold = 100000; public boolean format = false; public int flush = 0; // flush loader every 'flush' sends + + public String[] spaces; } \ No newline at end of file diff --git a/java/timebase/api/src/main/java/com/epam/deltix/util/vsocket/VSServerFramework.java b/java/timebase/api/src/main/java/com/epam/deltix/util/vsocket/VSServerFramework.java index 812c366c..b5b42e31 100644 --- a/java/timebase/api/src/main/java/com/epam/deltix/util/vsocket/VSServerFramework.java +++ b/java/timebase/api/src/main/java/com/epam/deltix/util/vsocket/VSServerFramework.java @@ -88,6 +88,11 @@ public QuickExecutor getExecutor () { return executor; } + public int getDispatchersCount() { + synchronized (dispatchers) { + return dispatchers.size(); + } + } public VSDispatcher [] getDispatchers () { VSDispatcher [] ret; diff --git a/java/timebase/commons/build.gradle b/java/timebase/commons/build.gradle index b15dd33a..dd50afe7 100644 --- a/java/timebase/commons/build.gradle +++ b/java/timebase/commons/build.gradle @@ -22,6 +22,8 @@ dependencies { implementation 'javax.servlet:javax.servlet-api:3.1.0' // Tomcat Requires implementation 'org.glassfish:javax.el:3.0.0' // Tomcat Requires + implementation 'io.micrometer:micrometer-registry-prometheus' + implementation 'javax.mail:mail' implementation 'commons-collections:commons-collections' diff --git a/java/timebase/commons/src/main/java/com/epam/deltix/qsrv/comm/cat/TomcatRunner.java b/java/timebase/commons/src/main/java/com/epam/deltix/qsrv/comm/cat/TomcatRunner.java index c3ec5bec..c4bc158a 100644 --- a/java/timebase/commons/src/main/java/com/epam/deltix/qsrv/comm/cat/TomcatRunner.java +++ b/java/timebase/commons/src/main/java/com/epam/deltix/qsrv/comm/cat/TomcatRunner.java @@ -20,6 +20,7 @@ import com.epam.deltix.qsrv.config.QuantServiceConfig; import com.epam.deltix.qsrv.config.QuantServiceConfig.Type; import com.epam.deltix.qsrv.config.ServiceExecutor; +import com.epam.deltix.qsrv.util.metrics.MetricsService; import com.epam.deltix.qsrv.util.servlet.AccessFilter; import com.epam.deltix.snmp.QuantServerSnmpObjectContainer; import com.epam.deltix.snmp.SNMPTransportFactory; @@ -113,6 +114,15 @@ public void init() throws Exception { mCat.setConnectionHandler(QuantServerExecutor.HANDLER); + if (config.tb != null) { + if (isMetricsServiceEnabled(config.tb, MetricsService.ENABLE_TIMEBASE_METRICS)) { + MetricsService.init(config.tb.getHost(), config.port, + isJvmMetricsEnabled(config.tb, MetricsService.ENABLE_JVM_TIMEBASE_METRICS), + isTomcatMetricsEnabled(config.tb, MetricsService.ENABLE_TOMCAT_TIMEBASE_METRICS) + ); + } + } + if (config.tb != null) { ServiceExecutor tb = config.getExecutor(Type.TimeBase); tb.run(config.tb); @@ -288,4 +298,22 @@ private void setRemoteAccess() { } } } + + private boolean isMetricsServiceEnabled(QuantServiceConfig config, boolean enabled) { + return enabled || (config != null && config.getBoolean(QuantServiceConfig.ENABLE_METRICS, false)); + } + + private boolean isJvmMetricsEnabled(QuantServiceConfig config, boolean enabled) { + return enabled || + (config != null && + config.getBoolean(QuantServiceConfig.ENABLE_METRICS, false) && + config.getBoolean(QuantServiceConfig.ENABLE_JVM_METRICS, false)); + } + + private boolean isTomcatMetricsEnabled(QuantServiceConfig config, boolean enabled) { + return enabled || + (config != null && + config.getBoolean(QuantServiceConfig.ENABLE_METRICS, false) && + config.getBoolean(QuantServiceConfig.ENABLE_TOMCAT_METRICS, false)); + } } \ No newline at end of file diff --git a/java/timebase/commons/src/main/java/com/epam/deltix/qsrv/config/QuantServiceConfig.java b/java/timebase/commons/src/main/java/com/epam/deltix/qsrv/config/QuantServiceConfig.java index 494359a8..b971479d 100644 --- a/java/timebase/commons/src/main/java/com/epam/deltix/qsrv/config/QuantServiceConfig.java +++ b/java/timebase/commons/src/main/java/com/epam/deltix/qsrv/config/QuantServiceConfig.java @@ -71,6 +71,11 @@ public class QuantServiceConfig { public static final String WEBAPP_PATH = "webapp.path"; + public static final String ENABLE_METRICS = "metrics.enable"; + public static final String ENABLE_JVM_METRICS = "metrics.enableJvmMetrics"; + public static final String ENABLE_TOMCAT_METRICS = "metrics.enableTomcatMetrics"; + + public enum Type { TimeBase, QuantServer diff --git a/java/timebase/commons/src/main/java/com/epam/deltix/qsrv/util/metrics/MetricsService.java b/java/timebase/commons/src/main/java/com/epam/deltix/qsrv/util/metrics/MetricsService.java new file mode 100644 index 00000000..fca7c3e4 --- /dev/null +++ b/java/timebase/commons/src/main/java/com/epam/deltix/qsrv/util/metrics/MetricsService.java @@ -0,0 +1,169 @@ +package com.epam.deltix.qsrv.util.metrics; + +import com.epam.deltix.gflog.api.Log; +import com.epam.deltix.gflog.api.LogFactory; +import com.epam.deltix.util.lang.Disposable; +import com.epam.deltix.util.time.TimeKeeper; +import io.micrometer.core.instrument.Clock; +import io.micrometer.core.instrument.binder.jvm.ClassLoaderMetrics; +import io.micrometer.core.instrument.binder.jvm.JvmGcMetrics; +import io.micrometer.core.instrument.binder.jvm.JvmMemoryMetrics; +import io.micrometer.core.instrument.binder.jvm.JvmThreadMetrics; +import io.micrometer.core.instrument.binder.system.ProcessorMetrics; +import io.micrometer.core.instrument.binder.system.UptimeMetrics; +import io.micrometer.core.instrument.binder.tomcat.TomcatMetrics; +import io.micrometer.prometheus.PrometheusConfig; +import io.micrometer.prometheus.PrometheusMeterRegistry; +import io.prometheus.client.CollectorRegistry; +import io.prometheus.client.exporter.common.TextFormat; + +import java.util.function.ToDoubleFunction; + +import static java.util.Collections.emptyList; + +public class MetricsService implements Disposable { + + // the property was left for backward compatibility + private static final String ENABLE_TIMEBASE_METRICS_PROP = "TimeBase.metrics.enable"; + public static final boolean ENABLE_TIMEBASE_METRICS = Boolean.getBoolean(ENABLE_TIMEBASE_METRICS_PROP); + + private static final String ENABLE_AGGREGATOR_METRICS_PROP = "Aggregator.metrics.enable"; + public static final boolean ENABLE_AGGREGATOR_METRICS = Boolean.getBoolean(ENABLE_AGGREGATOR_METRICS_PROP); + + private static final String ENABLE_JVM_TIMEBASE_METRICS_PROP = "TimeBase.metrics.enableJvmMetrics"; + public static final boolean ENABLE_JVM_TIMEBASE_METRICS = Boolean.getBoolean(ENABLE_JVM_TIMEBASE_METRICS_PROP); + private static final String ENABLE_TOMCAT_TIMEBASE_METRICS_PROP = "TimeBase.metrics.enableTomcatMetrics"; + public static final boolean ENABLE_TOMCAT_TIMEBASE_METRICS = Boolean.getBoolean(ENABLE_TOMCAT_TIMEBASE_METRICS_PROP); + + private static final String ENABLE_JVM_AGGREGATOR_METRICS_PROP = "Aggregator.metrics.enableJvmMetrics"; + public static final boolean ENABLE_JVM_AGGREGATOR_METRICS = Boolean.getBoolean(ENABLE_JVM_AGGREGATOR_METRICS_PROP); + private static final String ENABLE_TOMCAT_AGGREGATOR_METRICS_PROP = "Aggregator.metrics.enableTomcatMetrics"; + public static final boolean ENABLE_TOMCAT_AGGREGATOR_METRICS = Boolean.getBoolean(ENABLE_TOMCAT_AGGREGATOR_METRICS_PROP); + + private static final MetricsService INSTANCE = new MetricsService(); + + public static MetricsService getInstance() { + return INSTANCE; + } + + public synchronized static void init(String host, int port, boolean enableJvmMetrics, boolean enableTomcatMetrics) { + MetricsService metrics = getInstance(); + if (metrics.registry == null) { + metrics.registry = new PrometheusMeterRegistry( + PrometheusConfig.DEFAULT, CollectorRegistry.defaultRegistry, + new Clock() { + @Override + public long wallTime() { + return TimeKeeper.currentTime; + } + + @Override + public long monotonicTime() { + return System.nanoTime(); + } + } + ); + + metrics.registry.config().commonTags("host", host, "port", String.valueOf(port)); + + if (enableJvmMetrics) { + new ClassLoaderMetrics().bindTo(metrics.registry); + new JvmMemoryMetrics().bindTo(metrics.registry); + new ProcessorMetrics().bindTo(metrics.registry); + new JvmThreadMetrics().bindTo(metrics.registry); + new UptimeMetrics().bindTo(metrics.registry); + + metrics.jvmGcMetrics = new JvmGcMetrics(); + metrics.jvmGcMetrics.bindTo(metrics.registry); + } + + if (enableTomcatMetrics) { + metrics.tomcatMetrics = new TomcatMetrics(null, emptyList()); + metrics.tomcatMetrics.bindTo(metrics.registry); + } + } + } + + private final static Log LOGGER = LogFactory.getLog(MetricsService.class); + + private volatile PrometheusMeterRegistry registry; + private volatile JvmGcMetrics jvmGcMetrics; + private volatile TomcatMetrics tomcatMetrics; + + private MetricsService() { + } + + public boolean initialized() { + return registry != null; + } + + public T registerGauge(String name, T number) { + if (initialized()) { + return registry.gauge(name, number); + } + + return number; + } + + public T registerGauge(String name, T object, ToDoubleFunction function) { + if (initialized()) { + return registry.gauge(name, object, function); + } + + return object; + } + + public String scrape() { + checkIsNotConfigured(); + return registry.scrape(TextFormat.CONTENT_TYPE_OPENMETRICS_100); + } + + private void checkIsNotConfigured() { + if (!initialized()) { + throw new IllegalStateException("Metrics Service is not configured."); + } + } + + @Override + public synchronized void close() { + closeJvmMetrics(); + closeTomcatMetrics(); + closeMetricsRegistry(); + } + + private void closeJvmMetrics() { + try { + if (jvmGcMetrics != null) { + jvmGcMetrics.close(); + } + } catch (Throwable t) { + LOGGER.error().append("Failed to close JvmGcMetrics").append(t).commit(); + } finally { + jvmGcMetrics = null; + } + } + + private void closeTomcatMetrics() { + try { + if (tomcatMetrics != null) { + tomcatMetrics.close(); + } + } catch (Throwable t) { + LOGGER.error().append("Failed to close JvmGcMetrics").append(t).commit(); + } finally { + tomcatMetrics = null; + } + } + + private void closeMetricsRegistry() { + try { + if (registry != null) { + registry.close(); + } + } catch (Throwable t) { + LOGGER.error().append("Failed to close PrometheusMeterRegistry").append(t).commit(); + } finally { + registry = null; + } + } +} diff --git a/java/timebase/server/build.gradle b/java/timebase/server/build.gradle index 15123656..880b9836 100644 --- a/java/timebase/server/build.gradle +++ b/java/timebase/server/build.gradle @@ -40,6 +40,8 @@ dependencies { implementation 'com.epam.deltix:gflog-api', 'com.epam.deltix:gflog-core', 'com.epam.deltix:gflog-jul' + implementation 'io.micrometer:micrometer-registry-prometheus' + // web application implementation 'org.springframework:spring-webmvc' diff --git a/java/timebase/server/src/main/java/com/epam/deltix/qsrv/hf/tickdb/impl/mon/TBMonitorImpl.java b/java/timebase/server/src/main/java/com/epam/deltix/qsrv/hf/tickdb/impl/mon/TBMonitorImpl.java index 374815ec..35278372 100644 --- a/java/timebase/server/src/main/java/com/epam/deltix/qsrv/hf/tickdb/impl/mon/TBMonitorImpl.java +++ b/java/timebase/server/src/main/java/com/epam/deltix/qsrv/hf/tickdb/impl/mon/TBMonitorImpl.java @@ -22,6 +22,7 @@ import com.epam.deltix.util.lang.Util; import net.jcip.annotations.GuardedBy; import java.util.*; +import java.util.concurrent.atomic.AtomicInteger; /** * @@ -53,6 +54,9 @@ public abstract class TBMonitorImpl implements TBMonitor { private final CharSequenceToObjectMap locks = new CharSequenceToObjectMap<>(); + private final AtomicInteger cursorsCount = new AtomicInteger(); + private final AtomicInteger loadersCount = new AtomicInteger(); + private volatile boolean trackMessagesByInstrument = false; public final boolean getTrackMessages () { @@ -118,6 +122,7 @@ public final long registerLoader (TBLoader loader) { long index; synchronized (dbLock) { loaders.add (loader); + loadersCount.set(loaders.size()); index = idSequence++; monObjects.put (index, loader); } @@ -130,6 +135,7 @@ public final long registerCursor (TBCursor cursor) { long index; synchronized (dbLock) { cursors.add (cursor); + cursorsCount.set(cursors.size()); index = idSequence++; monObjects.put (index, cursor); } @@ -145,8 +151,11 @@ public final void unregisterLoader (TBLoader loader) if (loader != monObjects.remove (loader.getId (), null)) throw new RuntimeException (); + + loadersCount.set(loaders.size()); } + fireObjectRemoved(loader, loader.getId()); } @@ -157,6 +166,8 @@ public final void unregisterCursor (TBCursor cursor) if (cursor != monObjects.remove (cursor.getId (), null)) throw new RuntimeException (); + + cursorsCount.set(cursors.size()); } fireObjectRemoved(cursor, cursor.getId()); @@ -236,4 +247,13 @@ public void unregisterLock(TBLock lock) { fireObjectRemoved(lock, lock.getId()); } + public int cursorsCount() { + return cursorsCount.get(); + } + + public int loadersCount() { + return loadersCount.get(); + } + + } \ No newline at end of file diff --git a/java/timebase/server/src/main/java/com/epam/deltix/qsrv/hf/tickdb/impl/topic/TopicSupportWrapper.java b/java/timebase/server/src/main/java/com/epam/deltix/qsrv/hf/tickdb/impl/topic/TopicSupportWrapper.java index 9638f409..bd783cb5 100644 --- a/java/timebase/server/src/main/java/com/epam/deltix/qsrv/hf/tickdb/impl/topic/TopicSupportWrapper.java +++ b/java/timebase/server/src/main/java/com/epam/deltix/qsrv/hf/tickdb/impl/topic/TopicSupportWrapper.java @@ -95,4 +95,14 @@ public void close() { } super.close(); } + + @Override + public int cursorsCount() { + return 0; + } + + @Override + public int loadersCount() { + return 0; + } } \ No newline at end of file diff --git a/java/timebase/server/src/main/java/com/epam/deltix/qsrv/hf/tickdb/web/controller/MetricsController.java b/java/timebase/server/src/main/java/com/epam/deltix/qsrv/hf/tickdb/web/controller/MetricsController.java new file mode 100644 index 00000000..167c9aa5 --- /dev/null +++ b/java/timebase/server/src/main/java/com/epam/deltix/qsrv/hf/tickdb/web/controller/MetricsController.java @@ -0,0 +1,95 @@ +package com.epam.deltix.qsrv.hf.tickdb.web.controller; + +import com.epam.deltix.gflog.api.Log; +import com.epam.deltix.gflog.api.LogFactory; +import com.epam.deltix.qsrv.hf.tickdb.http.AbstractHandler; +import com.epam.deltix.qsrv.hf.tickdb.pub.mon.TBMonitor; +import com.epam.deltix.qsrv.util.metrics.MetricsService; +import com.epam.deltix.util.time.TimeKeeper; +import com.epam.deltix.util.vsocket.VSServerFramework; +import org.springframework.http.HttpStatus; +import org.springframework.http.MediaType; +import org.springframework.http.ResponseEntity; +import org.springframework.stereotype.Controller; +import org.springframework.web.bind.annotation.RequestMapping; +import org.springframework.web.bind.annotation.RequestMethod; +import org.springframework.web.bind.annotation.ResponseBody; + +import javax.annotation.PostConstruct; + +/** + * + */ +@Controller +@RequestMapping(value = "/") +public class MetricsController { + + private final static Log LOGGER = LogFactory.getLog(MetricsController.class); + + private final static long MAX_RESOLUTION_MS = 1000; + private final static String NUM_LOADERS_METRIC = "timebase.num_loaders"; + private final static String NUM_CURSORS_METRIC = "timebase.num_cursors"; + private final static String NUM_CONNECTIONS_METRIC = "timebase.connections.num_connections"; + private final static String TRAFFIC_RATE_METRIC = "timebase.connections.traffic_rate"; + private final static String AVAILABLE_MEMORY_METRIC = "timebase.memory.available"; + + private final static String LICENCE_DAYS_VALID_METRIC = "timebase.license.days_valid"; + private final static Runtime RUNTIME = Runtime.getRuntime(); + + private long lastRequestTime = TimeKeeper.currentTime; + private ResponseEntity cachedResponse = ResponseEntity.status(HttpStatus.SERVICE_UNAVAILABLE) + .body("Metrics service is not available"); + + public MetricsController() { + } + + @PostConstruct + public void init() { + if (MetricsService.getInstance().initialized()) { + if (AbstractHandler.TDB instanceof TBMonitor) { + MetricsService.getInstance().registerGauge(NUM_LOADERS_METRIC, + (TBMonitor) AbstractHandler.TDB, TBMonitor::loadersCount + ); + MetricsService.getInstance().registerGauge(NUM_CURSORS_METRIC, + (TBMonitor) AbstractHandler.TDB, TBMonitor::cursorsCount + ); + } + + MetricsService.getInstance().registerGauge(NUM_CONNECTIONS_METRIC, + VSServerFramework.INSTANCE, VSServerFramework::getDispatchersCount + ); + MetricsService.getInstance().registerGauge(TRAFFIC_RATE_METRIC, + VSServerFramework.INSTANCE, VSServerFramework::getThroughput + ); + MetricsService.getInstance().registerGauge(AVAILABLE_MEMORY_METRIC, + RUNTIME, (r) -> r.maxMemory() - r.totalMemory() + r.freeMemory() + ); + } + } + + @RequestMapping(value = "/metrics", method = RequestMethod.GET, produces = MediaType.APPLICATION_JSON_VALUE) + @ResponseBody + public ResponseEntity metrics() { + long currentTime = TimeKeeper.currentTime; + if (currentTime - lastRequestTime > MAX_RESOLUTION_MS) { + lastRequestTime = currentTime; + return cachedResponse = scrape(); + } + + return cachedResponse; + } + + private ResponseEntity scrape() { + try { + if (MetricsService.getInstance().initialized()) { + return ResponseEntity.ok().body(MetricsService.getInstance().scrape()); + } + } catch (Throwable t) { + LOGGER.error().append("Failed to scrape metrics").append(t).commit(); + return ResponseEntity.status(HttpStatus.INTERNAL_SERVER_ERROR).body("Failed to scrape metrics"); + } + + return ResponseEntity.status(HttpStatus.SERVICE_UNAVAILABLE).body("Metrics service is not available"); + } + +}