Skip to content

Commit

Permalink
LGouellec#218 - Change the internal middleware behavior
Browse files Browse the repository at this point in the history
  • Loading branch information
LGouellec committed Mar 23, 2023
1 parent 6f66c5d commit 9c2d472
Show file tree
Hide file tree
Showing 10 changed files with 126 additions and 64 deletions.
14 changes: 10 additions & 4 deletions core/IStreamMiddleware.cs
Original file line number Diff line number Diff line change
@@ -1,3 +1,5 @@
using System.Threading;

namespace Streamiz.Kafka.Net
{
/// <summary>
Expand All @@ -9,21 +11,25 @@ public interface IStreamMiddleware
/// Middleware function called before starting streaming application
/// </summary>
/// <param name="config">Actual configuration</param>
void BeforeStart(IStreamConfig config);
/// <param name="token">Token for propagates notification that the stream should be canceled.</param>
void BeforeStart(IStreamConfig config, CancellationToken token);
/// <summary>
/// Middleware function called after starting streaming application
/// </summary>
/// <param name="config">Actual configuration</param>
void AfterStart(IStreamConfig config);
/// <param name="token">Token for propagates notification that the stream should be canceled.</param>
void AfterStart(IStreamConfig config, CancellationToken token);
/// <summary>
/// Middleware function called before stopping streaming application
/// </summary>
/// <param name="config">Actual configuration</param>
void BeforeStop(IStreamConfig config);
/// <param name="token">Token for propagates notification that the stream should be canceled.</param>
void BeforeStop(IStreamConfig config, CancellationToken token);
/// <summary>
/// Middleware function called after stopping streaming application
/// </summary>
/// <param name="config">Actual configuration</param>
void AfterStop(IStreamConfig config);
/// <param name="token">Token for propagates notification that the stream should be canceled.</param>
void AfterStop(IStreamConfig config, CancellationToken token);
}
}
7 changes: 5 additions & 2 deletions core/KafkaStream.cs
Original file line number Diff line number Diff line change
Expand Up @@ -315,9 +315,12 @@ public KafkaStream(Topology topology, IStreamConfig configuration, IKafkaSupplie

int numStreamThreads = topology.Builder.HasNoNonGlobalTopology ? 0 : configuration.NumStreamThreads;

string Protect(string str)
=> str.Replace("\n", "\\n");

GeneralClientMetrics.StreamsAppSensor(
configuration.ApplicationId,
topology.Describe().ToString(),
Protect(topology.Describe().ToString()),
() => StreamState != null && StreamState.IsRunning() ? 1 : 0,
() => threads.Count(t => t.State != ThreadState.DEAD && t.State != ThreadState.PENDING_SHUTDOWN),
metricsRegistry);
Expand Down Expand Up @@ -633,7 +636,7 @@ private void RunMiddleware(bool before, bool start)
var methods = typeof(IStreamMiddleware).GetMethods();
logger.LogInformation($"{logPrefix}Starting middleware {methods[index].Name.ToLowerInvariant()}");
foreach (var middleware in configuration.Middlewares)
methods[index].Invoke(middleware, new object[] {configuration});
methods[index].Invoke(middleware, new object[] {configuration, _cancelSource.Token});
logger.LogInformation($"{logPrefix}Middleware {methods[index].Name.ToLowerInvariant()} done");
}
}
Expand Down
Original file line number Diff line number Diff line change
@@ -1,3 +1,4 @@
using System.Threading;
using OpenTelemetry.Metrics;

namespace Streamiz.Kafka.Net.Metrics.OpenTelemetry
Expand All @@ -14,19 +15,19 @@ public OpenTelemetryRunner(MeterProvider meterProvider,
this.openTelemetryMetricsExporter = openTelemetryMetricsExporter;
}

public void BeforeStart(IStreamConfig config)
public void BeforeStart(IStreamConfig config, CancellationToken token)
{
}

public void AfterStart(IStreamConfig config)
public void AfterStart(IStreamConfig config, CancellationToken token)
{
}

public void BeforeStop(IStreamConfig config)
public void BeforeStop(IStreamConfig config, CancellationToken token)
{
}

public void AfterStop(IStreamConfig config)
public void AfterStop(IStreamConfig config, CancellationToken token)
{
meterProvider.ForceFlush();
meterProvider.Dispose();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -2,21 +2,21 @@

namespace Streamiz.Kafka.Net.Metrics.Prometheus
{
internal class Gauge2
internal class Gauge
{
public string Key { get; set; }
public string Description { get; set; }
public IReadOnlyDictionary<string, string> Labels { get; set; }
public double Value { get; private set; }

public Gauge2(string key, string description, IReadOnlyDictionary<string, string> labels)
public Gauge(string key, string description, IReadOnlyDictionary<string, string> labels)
{
Key = key;
Description = description;
Labels = labels;
}

public Gauge2 SetValue(double value)
public Gauge SetValue(double value)
{
Value = value;
return this;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -46,7 +46,7 @@ internal void Expose(IEnumerable<Sensor> sensors)
foreach (var metric in sensors.SelectMany(s => s.Metrics))
{
var metricKey = MetricKey(metric.Value);
var newGauge = new Gauge2(metricKey, metric.Key.Description, metric.Value.Tags);
var newGauge = new Gauge(metricKey, metric.Key.Description, metric.Value.Tags);
MetricsServer.AddGauge(newGauge, metric);
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -4,32 +4,57 @@
using System.IO;
using System.Linq;
using System.Net;
using System.Text;
using System.Threading;
using System.Threading.Tasks;
using Prometheus;
using Streamiz.Kafka.Net.Crosscutting;

namespace Streamiz.Kafka.Net.Metrics.Prometheus
{
public class PrometheusMetricServer : MetricServer
public class PrometheusMetricServer : IDisposable
{
private readonly HttpListener _httpListener = new HttpListener();
private static readonly object @lock = new object();
private readonly HttpListener _httpListener = new ();
private static readonly object @lock = new();
private readonly IList<(Gauge, StreamMetric)> gauges = new List<(Gauge, StreamMetric)>();
private CollectorRegistry collectorRegistry;
private Task? task;

public PrometheusMetricServer(int port, string url = "metrics/", CollectorRegistry registry = null, bool useHttps = false)
: this("+", port, url, registry, useHttps)
public PrometheusMetricServer(int port, string url = "metrics/", bool useHttps = false)
: this("+", port, url, useHttps)
{ }

private PrometheusMetricServer(string hostname, int port, string url = "metrics/", CollectorRegistry registry = null, bool useHttps = false)
: base(hostname, port, url, registry, useHttps)
private PrometheusMetricServer(string hostname, int port, string url = "metrics/", bool useHttps = false)
{
var s = useHttps ? "s" : "";
_httpListener.Prefixes.Add($"http{s}://{hostname}:{port}/{url}");
_httpListener.Prefixes.Add($"http{s}://{hostname}:{port}/");
}

protected override Task StartServer(CancellationToken cancel)
public void Start(CancellationToken token)
{
if (task != null)
throw new InvalidOperationException("The metric server has already been started.");

task = StartServer(token);
}

public async Task StopAsync()
{
try
{
if (task == null)
return;
await task.ConfigureAwait(false);
}
catch (OperationCanceledException ex)
{
}
}

public void Stop() => StopAsync().GetAwaiter().GetResult();

void IDisposable.Dispose() => Stop();

protected Task StartServer(CancellationToken cancel)
{
// This will ensure that any failures to start are nicely thrown from StartServerAsync.
_httpListener.Start();
Expand Down Expand Up @@ -59,44 +84,50 @@ protected override Task StartServer(CancellationToken cancel)

try
{
List<Gauge> tmpGauges = null;

lock (@lock)
{
foreach (var metric in gauges)
{
Double value;
if(Crosscutting.Utils.IsNumeric(metric.Item2.Value, out value))
metric.Item1.WithLabels(metric.Item2.Tags.Values.ToArray()).Set(value);
if(Utils.IsNumeric(metric.Item2.Value, out value))
metric.Item1.SetValue(value);
else
metric.Item1.WithLabels(metric.Item2.Tags.Values.ToArray()).Set(1);
metric.Item1.SetValue(1);
}

tmpGauges = gauges.Select(g => g.Item1).ToList();
}

using (MemoryStream ms = new MemoryStream())
using (var ms = new MemoryStream())
{
if (collectorRegistry != null)
{
await collectorRegistry.CollectAndExportAsTextAsync(ms, cancel);

ms.Position = 0;

response.ContentType = PrometheusConstants.ExporterContentType;
response.StatusCode = 200;
await ms.CopyToAsync(response.OutputStream, 81920, cancel);
}
foreach (var metric in tmpGauges)
ExportMetricAsTest(ms, metric);

await ms.FlushAsync(cancel);

ms.Seek(0, SeekOrigin.Begin);

response.ContentType = "text/plain";
response.StatusCode = 200;
//var memoryBuffer = ms.GetBuffer();
//response.OutputStream.Write(memoryBuffer, 0, memoryBuffer.Length);
await ms.CopyToAsync(response.OutputStream, 2048*2*2, cancel);
}

response.OutputStream.Dispose();
}
catch (ScrapeFailedException ex)
catch (Exception ex)
{
// This can only happen before anything is written to the stream, so it
// should still be safe to update the status code and report an error.
response.StatusCode = 503;

if (!string.IsNullOrWhiteSpace(ex.Message))
{
using(var writer = new StreamWriter(response.OutputStream))
writer.Write(ex.Message);
using var writer = new StreamWriter(response.OutputStream);
await writer.WriteAsync(ex.Message);
}
}
}
Expand All @@ -105,7 +136,7 @@ protected override Task StartServer(CancellationToken cancel)
if (!_httpListener.IsListening)
return; // We were shut down.

Trace.WriteLine(string.Format("Error in {0}: {1}", nameof(MetricServer), ex));
Trace.WriteLine(string.Format("Error in {0}: {1}", nameof(PrometheusMetricServer), ex));

try
{
Expand Down Expand Up @@ -133,14 +164,22 @@ protected override Task StartServer(CancellationToken cancel)
}, TaskCreationOptions.LongRunning);
}

public void ClearGauges(CollectorRegistry collectorRegistry)
private void ExportMetricAsTest(MemoryStream ms, Gauge metric)
{
var tags = string.Join(",", metric.Labels.Select(kv => $"{kv.Key}=\"{kv.Value}\""));
var formattedMetric =
metric.Key + "{" + tags + "} " + metric.Value + Environment.NewLine;
var buffer = Encoding.UTF8.GetBytes(formattedMetric);
ms.Write(buffer, 0, buffer.Length);
}

internal void ClearGauges()
{
this.collectorRegistry = collectorRegistry;
lock (@lock)
gauges.Clear();
}

public void AddGauge(Gauge gauge, KeyValuePair<MetricName, StreamMetric> metric)
internal void AddGauge(Gauge gauge, KeyValuePair<MetricName, StreamMetric> metric)
{
lock(@lock)
gauges.Add((gauge, metric.Value));
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,14 @@
<PackageIconUrl>https://raw.githubusercontent.com/LGouellec/kafka-streams-dotnet/master/resources/logo-kafka-stream-net.png</PackageIconUrl>
</PropertyGroup>

<PropertyGroup Condition="'$(Configuration)|$(Platform)'=='Debug|AnyCPU'">
<LangVersion>9.0</LangVersion>
</PropertyGroup>

<PropertyGroup Condition=" '$(Configuration)|$(Platform)' == 'Release|AnyCPU' ">
<LangVersion>9.0</LangVersion>
</PropertyGroup>

<ItemGroup>
<ProjectReference Include="..\..\core\Streamiz.Kafka.Net.csproj" />
</ItemGroup>
Expand All @@ -27,8 +35,4 @@
</None>
</ItemGroup>

<ItemGroup>
<PackageReference Include="prometheus-net" Version="6.0.0" />
</ItemGroup>

</Project>

This file was deleted.

36 changes: 23 additions & 13 deletions samples/sample-stream/Program.cs
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,8 @@
using System.Text.Json;
using Microsoft.Extensions.Logging;
using Microsoft.VisualBasic;
using Streamiz.Kafka.Net.Metrics;
using Streamiz.Kafka.Net.Metrics.Prometheus;
using Streamiz.Kafka.Net.Mock;
using Streamiz.Kafka.Net.Processors;
using Streamiz.Kafka.Net.Processors.Public;
Expand All @@ -26,18 +28,10 @@ namespace sample_stream
/// </summary>
internal class Program
{
class Order
{
public int OrderId { get; set; }
public string ProductId { get; set; }
public DateTime OrderTime { get; set; }
}


public static async Task Main(string[] args)
{
var config = new StreamConfig();
config.ApplicationId = "test-app-cloud-events";
config.ApplicationId = "test-app-218";
config.BootstrapServers = "localhost:9092";
config.AutoOffsetReset = AutoOffsetReset.Earliest;
config.CommitIntervalMs = 3000;
Expand All @@ -47,14 +41,30 @@ public static async Task Main(string[] args)
b.SetMinimumLevel(LogLevel.Information);
b.AddLog4Net();
});
config.UsePrometheusReporter(9090);
config.MetricsRecording = MetricsRecordingLevel.DEBUG;

StreamBuilder builder = new StreamBuilder();

builder
.Stream<string, Order, StringSerDes, JsonSerDes<Order>>("order")
.Filter((k, v) => v.OrderId >= 200)
.To<StringSerDes, CloudEventSerDes<Order>>("order-filtered");
string inputTopic = "words", outputTopic = "words-count";
TimeSpan windowSize = TimeSpan.FromDays(1);

IKStream<string, string> stream = builder.Stream<string, string>(inputTopic);
stream
.GroupByKey()
.WindowedBy(TumblingWindowOptions.Of(windowSize))
.Count()
.ToStream()
.Map((k,v) => KeyValuePair.Create(k.Key, v))
.To<StringSerDes, Int64SerDes>(outputTopic);

stream
.GroupByKey()
.Count(
InMemory.As<string, long>("count-store")
.WithKeySerdes<StringSerDes>()
.WithValueSerdes<Int64SerDes>());

Topology t = builder.Build();
KafkaStream stream1 = new KafkaStream(t, config);

Expand Down
1 change: 1 addition & 0 deletions samples/sample-stream/sample-stream.csproj
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@

<ItemGroup>
<ProjectReference Include="..\..\core\Streamiz.Kafka.Net.csproj" />
<ProjectReference Include="..\..\metrics\Streamiz.Kafka.Net.Metrics.Prometheus\Streamiz.Kafka.Net.Metrics.Prometheus.csproj" />
<ProjectReference Include="..\..\serdes\Streamiz.Kafka.Net.SerDes.CloudEvents\Streamiz.Kafka.Net.SerDes.CloudEvents.csproj" />
</ItemGroup>

Expand Down

0 comments on commit 9c2d472

Please sign in to comment.