Skip to content

Commit

Permalink
Merge branch 'develop' into feature/processor-api
Browse files Browse the repository at this point in the history
  • Loading branch information
LGouellec committed Oct 18, 2022
2 parents 4870e88 + d888046 commit 646f61b
Show file tree
Hide file tree
Showing 82 changed files with 1,222 additions and 505 deletions.
2 changes: 1 addition & 1 deletion .circleci/config.yml
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,7 @@
# END Dependencies for RocksDB
- run: set JAVA_HOME /usr/lib/jvm/java-11-openjdk-amd64/
- run: export JAVA_HOME
- run: dotnet sonarscanner begin /k:LGouellec_kafka-streams-dotnet /o:kafka-streams-dotnet /d:sonar.login=${SONAR_TOKEN} /d:sonar.host.url=https://sonarcloud.io /d:sonar.cs.opencover.reportsPaths="**\coverage*.opencover.xml" /d:sonar.coverage.exclusions="**sample*/*.cs,**test*/*.cs,**Tests*.cs"
- run: dotnet sonarscanner begin /k:LGouellec_kafka-streams-dotnet /o:kafka-streams-dotnet /d:sonar.login=${SONAR_TOKEN} /d:sonar.host.url=https://sonarcloud.io /d:sonar.cs.opencover.reportsPaths="**\coverage*.opencover.xml" /d:sonar.coverage.exclusions="**sample*/*.cs,**test*/*.cs,**Tests*.cs,**Mock*.cs"
- run: dotnet build
- run: dotnet test --no-restore --no-build --verbosity normal -f net6.0 --collect:"XPlat Code Coverage" /p:CollectCoverage=true /p:CoverletOutputFormat=opencover test/Streamiz.Kafka.Net.Tests/Streamiz.Kafka.Net.Tests.csproj
- run: dotnet sonarscanner end /d:sonar.login=${SONAR_TOKEN}
Expand Down
1 change: 1 addition & 0 deletions .github/workflows/integration.yml
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@ jobs:
runs-on: ubuntu-latest

steps:
- uses: actions/checkout@v2
- name: Setup .NET 6.0
uses: actions/setup-dotnet@v1
with:
Expand Down
2 changes: 1 addition & 1 deletion .github/workflows/release.yml
Original file line number Diff line number Diff line change
Expand Up @@ -41,4 +41,4 @@ jobs:
- name: Pack Metrics Prometheus
run: dotnet pack metrics/Streamiz.Kafka.Net.Metrics.Prometheus/Streamiz.Kafka.Net.Metrics.Prometheus.csproj --configuration Release --no-build --no-restore
- name: Publish in nuget.org
run: dotnet nuget push **/*symbols.nupkg -k ${{ secrets.NUGET_PACKAGE_TOKEN }} -s https://api.nuget.org/v3/index.json -n -d
run: dotnet nuget push **/*.nupkg -k ${{ secrets.NUGET_PACKAGE_TOKEN }} -s https://api.nuget.org/v3/index.json -n -d
9 changes: 9 additions & 0 deletions core/Crosscutting/DictionaryExtensions.cs
Original file line number Diff line number Diff line change
Expand Up @@ -118,6 +118,15 @@ public static bool TryAddOrUpdate<K, V>(this ConcurrentDictionary<K, V> source,
return source.TryAdd(key, value);
}
}


public static void CreateListOrAdd<K, V>(this IDictionary<K, List<V>> source, K key, V value)
{
if(source.ContainsKey(key))
source[key].Add(value);
else
source.Add(key, new List<V>{value});
}

}
}
3 changes: 1 addition & 2 deletions core/KafkaStream.cs
Original file line number Diff line number Diff line change
Expand Up @@ -275,8 +275,7 @@ public override string ToString()
/// <param name="configuration">configuration about this stream</param>
public KafkaStream(Topology topology, IStreamConfig configuration)
: this(topology, configuration, new DefaultKafkaClientSupplier(new KafkaLoggerAdapter(configuration), configuration))
{
}
{ }

/// <summary>
/// Create a <see cref="KafkaStream"/> instance with your own <see cref="IKafkaSupplier" />
Expand Down
6 changes: 3 additions & 3 deletions core/Mock/Kafka/MockCluster.cs
Original file line number Diff line number Diff line change
Expand Up @@ -406,7 +406,7 @@ private void NeedRebalance2(string groupId, string consumerName)
if (!partitionsToRevoked.ContainsKey(consumer))
partitionsToRevoked.Add(consumer, new List<TopicPartitionOffset>());

var tpo = consumer.TopicPartitionsOffset.FirstOrDefault(t =>
var tpo = consumer.TopicPartitionsOffset.First(t =>
t.TopicPartition.Equals(part));
partitionsToRevoked[consumer]
.Add(new TopicPartitionOffset(part, tpo.OffsetConsumed));
Expand Down Expand Up @@ -619,7 +619,7 @@ private void AssignPartitionOffset(MockConsumerInformation consumerInformation,
if (consumerInformation.TopicPartitionsOffset.Contains(new MockTopicPartitionOffset()
{Partition = tp.Partition.Value, Topic = tp.Topic}))
{
var tpo = consumerInformation.TopicPartitionsOffset.FirstOrDefault(t =>
var tpo = consumerInformation.TopicPartitionsOffset.First(t =>
t.Partition.Equals(tp.Partition.Value) && t.Topic.Equals(tp.Topic));
tpo.OffsetComitted = offset;
tpo.OffsetConsumed = offset;
Expand All @@ -646,7 +646,7 @@ private void AssignPartition(MockConsumerInformation consumerInformation,
if (consumerInformation.TopicPartitionsOffset.Contains(new MockTopicPartitionOffset()
{Partition = tp.Partition.Value, Topic = tp.Topic}))
{
var tpo = consumerInformation.TopicPartitionsOffset.FirstOrDefault(t =>
var tpo = consumerInformation.TopicPartitionsOffset.First(t =>
t.Partition.Equals(tp.Partition.Value) && t.Topic.Equals(tp.Topic));
tpo.OffsetComitted = offset.Offset;
tpo.OffsetConsumed = offset.Offset;
Expand Down
2 changes: 1 addition & 1 deletion core/Mock/Kafka/MockConsumer.cs
Original file line number Diff line number Diff line change
Expand Up @@ -113,7 +113,7 @@ public Offset Position(TopicPartition partition)
{
var info = cluster.GetConsumerInformation(Name);
return info != null ?
info.TopicPartitionsOffset.FirstOrDefault(t => t.TopicPartition.Equals(partition)).OffsetConsumed :
info.TopicPartitionsOffset.First(t => t.TopicPartition.Equals(partition)).OffsetConsumed :
Offset.Unset;
}

Expand Down
8 changes: 7 additions & 1 deletion core/Processors/GlobalStreamThread.cs
Original file line number Diff line number Diff line change
Expand Up @@ -39,7 +39,13 @@ public StateConsumer(
public void Initialize()
{
IDictionary<TopicPartition, long> partitionOffsets = globalStateMaintainer.Initialize();
globalConsumer.Assign(partitionOffsets.Keys.Select(x => new TopicPartitionOffset(x, partitionOffsets[x])));
globalConsumer.Assign(
partitionOffsets
.Keys
.Select(
x => partitionOffsets[x] >= 0 ?
new TopicPartitionOffset(x, partitionOffsets[x] + 1 )
: new TopicPartitionOffset(x, Offset.Beginning)));

lastFlush = DateTime.Now;
}
Expand Down
17 changes: 11 additions & 6 deletions core/Processors/Internal/GlobalStateManager.cs
Original file line number Diff line number Diff line change
Expand Up @@ -169,7 +169,7 @@ public void Register(IStateStore store, StateRestoreCallback callback)
log.LogInformation($"Restoring state for global store {store.Name}");
var topicPartitions = TopicPartitionsForStore(store).ToList();
var highWatermarks = OffsetsChangelogs(topicPartitions);

try
{
RestoreState(
Expand All @@ -181,6 +181,7 @@ public void Register(IStateStore store, StateRestoreCallback callback)
finally
{
globalConsumer.Unassign();
log.LogInformation($"Global store {store.Name} is completely restored");
}
}

Expand Down Expand Up @@ -219,7 +220,7 @@ private IEnumerable<TopicPartition> TopicPartitionsForStore(IStateStore store)
private void RestoreState(
StateRestoreCallback restoreCallback,
List<TopicPartition> topicPartitions,
IDictionary<TopicPartition, (Offset, Offset)> highWatermarks,
IDictionary<TopicPartition, (Offset, Offset)> offsetWatermarks,
Func<ConsumeResult<byte[], byte[]>, ConsumeResult<byte[], byte[]>> recordConverter)
{
foreach (var topicPartition in topicPartitions)
Expand All @@ -233,22 +234,26 @@ private void RestoreState(

globalConsumer.Assign((new TopicPartitionOffset(topicPartition, new Offset(checkpoint))).ToSingle());
offset = checkpoint;
highWM = highWatermarks[topicPartition].Item2;
var lowWM = offsetWatermarks[topicPartition].Item1;
highWM = offsetWatermarks[topicPartition].Item2;

while (offset < highWM - 1)
{
if (offset == Offset.Beginning && highWM == 0) // no message into local and topics;
break;

if (lowWM == highWM) // if low offset == high offset
break;

var records = globalConsumer.ConsumeRecords(TimeSpan.FromMilliseconds(config.PollMs),
config.MaxPollRestoringRecords).ToList();

var convertedRecords = records.Select(r => recordConverter(r)).ToList();

foreach(var record in records)
foreach(var record in convertedRecords)
restoreCallback?.Invoke(Bytes.Wrap(record.Message.Key), record.Message.Value, record.Message.Timestamp.UnixTimestampMs);

if (records.Any())
if (convertedRecords.Any())
offset = records.Last().Offset;
}

Expand All @@ -260,7 +265,7 @@ private void RestoreState(
{
return topicPartitions
.Select(tp => {
var offsets = globalConsumer.GetWatermarkOffsets(tp);
var offsets = globalConsumer.QueryWatermarkOffsets(tp, TimeSpan.FromSeconds(5));
return new
{
TopicPartition = tp,
Expand Down
10 changes: 5 additions & 5 deletions core/Processors/Internal/InternalTopologyBuilder.cs
Original file line number Diff line number Diff line change
Expand Up @@ -354,7 +354,7 @@ private ProcessorTopology BuildTopology(ISet<string> nodeGroup, TaskId taskId)
// need refactor a little for repartition topic/processor source & sink etc .. change topic name
IProcessor rootProcessor = new RootProcessor();
IDictionary<string, IProcessor> sources = new Dictionary<string, IProcessor>();
IDictionary<string, IProcessor> sinks = new Dictionary<string, IProcessor>();
IDictionary<string, List<IProcessor>> sinks = new Dictionary<string, List<IProcessor>>();
IDictionary<string, IProcessor> processors = new Dictionary<string, IProcessor>();
IDictionary<string, IStateStore> stateStores = new Dictionary<string, IStateStore>();
IList<string> repartitionTopics = new List<string>();
Expand Down Expand Up @@ -408,7 +408,7 @@ private ProcessorTopology BuildTopology(ISet<string> nodeGroup, TaskId taskId)
private void BuildSinkNode(
IDictionary<string, IProcessor> processors,
IList<string> repartitionTopics,
IDictionary<string, IProcessor> sinks,
IDictionary<string, List<IProcessor>> sinks,
ISinkNodeFactory factory,
IProcessor processor)
{
Expand All @@ -423,14 +423,14 @@ private void BuildSinkNode(
{
var repartitionTopic = DecorateTopic(factory.Topic);
repartitionTopics.Add(repartitionTopic);
sinks.Add(repartitionTopic, processor);
sinks.CreateListOrAdd(repartitionTopic, processor);
((ISinkProcessor) processor).UseRepartitionTopic(repartitionTopic);
}
else
sinks.Add(factory.Topic, processor);
sinks.CreateListOrAdd(factory.Topic, processor);
}
else
sinks.Add(factory.Name, processor);
sinks.CreateListOrAdd(factory.Name, processor);
}

private void BuildSourceNode(
Expand Down
7 changes: 6 additions & 1 deletion core/Processors/Internal/StoreChangelogReader.cs
Original file line number Diff line number Diff line change
Expand Up @@ -247,13 +247,18 @@ internal bool HasRestoredToEnd(ChangelogMetadata changelogMetadata)
long? endOffset = changelogMetadata.RestoreEndOffset;
if (endOffset == null || endOffset == Offset.Unset || endOffset == 0)
return true;


if(changelogMetadata.CurrentOffset >= endOffset)
return true;

if (!changelogMetadata.BufferedRecords.Any())
{
var offset = restoreConsumer.Position(changelogMetadata.StoreMetadata.ChangelogTopicPartition);
return offset != Offset.Unset && offset >= endOffset;
}

return changelogMetadata.CurrentOffset >= endOffset;
return false;
}

private void BufferedRecords(IEnumerable<ConsumeResult<byte[], byte[]>> records)
Expand Down
10 changes: 5 additions & 5 deletions core/Processors/StreamThread.cs
Original file line number Diff line number Diff line change
Expand Up @@ -21,28 +21,28 @@ internal class StreamThread : IThread

public static string GetTaskProducerClientId(string threadClientId, TaskId taskId)
{
return threadClientId + "-" + taskId + "-producer";
return threadClientId + "-" + taskId + "-streamiz-producer";
}

public static string GetThreadProducerClientId(string threadClientId)
{
return threadClientId + "-producer";
return threadClientId + "-streamiz-producer";
}

public static string GetConsumerClientId(string threadClientId)
{
return threadClientId + "-consumer";
return threadClientId + "-streamiz-consumer";
}

public static string GetRestoreConsumerClientId(string threadClientId)
{
return threadClientId + "-restore-consumer";
return threadClientId + "-streamiz-restore-consumer";
}

// currently admin client is shared among all threads
public static string GetSharedAdminClientId(string clientId)
{
return clientId + "-admin";
return clientId + "-streamiz-admin";
}

internal static IThread Create(string threadId, string clientId, InternalTopologyBuilder builder,
Expand Down
2 changes: 1 addition & 1 deletion core/State/InMemory/InMemoryKeyValueBytesStoreSupplier.cs
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,7 @@ public InMemoryKeyValueBytesStoreSupplier(string name)
/// <summary>
/// Name of this state store supplier. This must be a valid Kafka topic name; valid characters are ASCII alphanumerics, '.', '_' and '-'.
/// </summary>
public string Name { get; }
public string Name { get; set; }

/// <summary>
/// Return a new <see cref="IStateStore"/> instance.
Expand Down
10 changes: 4 additions & 6 deletions core/State/InMemory/InMemoryWindowStoreSupplier.cs
Original file line number Diff line number Diff line change
Expand Up @@ -9,8 +9,6 @@ namespace Streamiz.Kafka.Net.State.InMemory
/// </summary>
public class InMemoryWindowStoreSupplier : IWindowBytesStoreSupplier
{
private readonly TimeSpan retention;

/// <summary>
/// Constructor
/// </summary>
Expand All @@ -20,7 +18,7 @@ public class InMemoryWindowStoreSupplier : IWindowBytesStoreSupplier
public InMemoryWindowStoreSupplier(string storeName, TimeSpan retention, long? size)
{
Name = storeName;
this.retention = retention;
Retention = (long) retention.TotalMilliseconds;
WindowSize = size;
}

Expand All @@ -32,7 +30,7 @@ public InMemoryWindowStoreSupplier(string storeName, TimeSpan retention, long? s
/// <summary>
/// Name of state store
/// </summary>
public string Name { get; }
public string Name { get; set; }

/// <summary>
/// Window size of state store
Expand All @@ -42,14 +40,14 @@ public InMemoryWindowStoreSupplier(string storeName, TimeSpan retention, long? s
/// <summary>
/// Retention period of state store
/// </summary>
public long Retention => (long)retention.TotalMilliseconds;
public long Retention { get; set; }

/// <summary>
/// Return a new <see cref="IWindowStore{K, V}"/> instance.
/// </summary>
/// <returns>Return a new <see cref="IWindowStore{K, V}"/> instance.</returns>
public IWindowStore<Bytes, byte[]> Get()
=> new InMemoryWindowStore(Name, retention, WindowSize.Value);
=> new InMemoryWindowStore(Name, TimeSpan.FromMilliseconds(Retention), WindowSize.Value);

}
}
8 changes: 7 additions & 1 deletion core/State/Internal/CompositeReadOnlyKeyValueStore.cs
Original file line number Diff line number Diff line change
Expand Up @@ -80,7 +80,13 @@ public V Get(K key)
IEnumerable<IReadOnlyKeyValueStore<K, V>> stores = GetAllStores();
try
{
return stores.FirstOrDefault(x => x.Get(key) != null).Get(key);
foreach (var store in stores)
{
var value = store.Get(key);
if (value != null)
return value;
}
return default;
}
catch (InvalidStateStoreException e)
{
Expand Down
2 changes: 1 addition & 1 deletion core/State/RocksDb/RocksDbKeyValueBytesStoreSupplier.cs
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,7 @@ public RocksDbKeyValueBytesStoreSupplier(string name)
/// <summary>
/// State store name
/// </summary>
public string Name { get; }
public string Name { get; set; }

/// <summary>
/// Build the rocksdb state store.
Expand Down
7 changes: 3 additions & 4 deletions core/State/RocksDb/RocksDbWindowBytesStoreSupplier.cs
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,6 @@ namespace Streamiz.Kafka.Net.State.RocksDb
/// </summary>
public class RocksDbWindowBytesStoreSupplier : IWindowBytesStoreSupplier
{
private readonly TimeSpan retention;
private readonly long segmentInterval;

/// <summary>
Expand All @@ -27,7 +26,7 @@ public RocksDbWindowBytesStoreSupplier(
long? size)
{
Name = storeName;
this.retention = retention;
Retention = (long)retention.TotalMilliseconds;
this.segmentInterval = segmentInterval;
WindowSize = size;
}
Expand All @@ -45,12 +44,12 @@ public RocksDbWindowBytesStoreSupplier(
/// <summary>
/// Retention of the state store
/// </summary>
public long Retention => (long)retention.TotalMilliseconds;
public long Retention { get; set; }

/// <summary>
/// State store name
/// </summary>
public string Name { get; }
public string Name { get; set; }

/// <summary>
/// Build the rocksdb state store.
Expand Down
2 changes: 1 addition & 1 deletion core/State/Supplier/IStoreSupplier.cs
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,7 @@ public interface IStoreSupplier<out T>
/// Return the name of this state store supplier.
/// This must be a valid Kafka topic name; valid characters are ASCII alphanumerics, '.', '_' and '-'.
/// </summary>
string Name { get; }
string Name { get; set; }

/// <summary>
/// Return a new <see cref="IStateStore"/> instance of type <typeparamref name="T"/>.
Expand Down
2 changes: 1 addition & 1 deletion core/State/Supplier/IWindowBytesStoreSupplier.cs
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,6 @@ public interface IWindowBytesStoreSupplier : IStoreSupplier<IWindowStore<Bytes,
/// <summary>
/// The time period for which the <see cref="IWindowStore{K, V}"/> will retain historic data.
/// </summary>
public long Retention { get; }
public long Retention { get; set; }
}
}
3 changes: 2 additions & 1 deletion core/Stream/Internal/GroupedStreamAggregateBuilder.cs
Original file line number Diff line number Diff line change
Expand Up @@ -90,7 +90,8 @@ private string Repartition(StoreBuilder storeBuilder)
{
if (repartitionRequired)
{
(string sourceName, RepartitionNode<K,V> repartNode) = KStream<K, V>.CreateRepartitionSource(grouped.Named ?? storeBuilder.Name, grouped.Key, grouped.Value, builder);
string suffix = grouped.Named ?? storeBuilder.Name;
(string sourceName, RepartitionNode<K,V> repartNode) = KStream<K, V>.CreateRepartitionSource(suffix, grouped.Key, grouped.Value, builder);

if (repartitionNode == null || grouped.Named == null)
repartitionNode = repartNode;
Expand Down
Loading

0 comments on commit 646f61b

Please sign in to comment.