Skip to content

Commit

Permalink
Move Kafka integration test to cloudkarafka.com #21
Browse files Browse the repository at this point in the history
  • Loading branch information
zarusz committed Mar 8, 2020
1 parent 2fb2b7a commit 7709d01
Show file tree
Hide file tree
Showing 5 changed files with 68 additions and 13 deletions.
1 change: 1 addition & 0 deletions src/SlimMessageBus.sln
Original file line number Diff line number Diff line change
Expand Up @@ -126,6 +126,7 @@ EndProject
Project("{2150E333-8FDC-42A3-9474-1A3956D46DE8}") = "Solution Items", "Solution Items", "{A6C28448-3839-490C-BE30-580C1A45E225}"
ProjectSection(SolutionItems) = preProject
.editorconfig = .editorconfig
cloudkarafka-ca-root.crt = cloudkarafka-ca-root.crt
Common.Properties.xml = Common.Properties.xml
EndProjectSection
EndProject
Expand Down
39 changes: 28 additions & 11 deletions src/Tests/SlimMessageBus.Host.Kafka.Test/KafkaMessageBusIt.cs
Original file line number Diff line number Diff line change
Expand Up @@ -30,7 +30,6 @@ namespace SlimMessageBus.Host.Kafka.Test
/// </remarks>
/// </summary>
[Trait("Category", "Integration")]
[Trait("Category", "Local")]
public class KafkaMessageBusIt : IDisposable
{
private static readonly ILog Log = LogManager.GetLogger(MethodBase.GetCurrentMethod().DeclaringType);
Expand All @@ -41,6 +40,19 @@ public class KafkaMessageBusIt : IDisposable
private MessageBusBuilder MessageBusBuilder { get; }
private Lazy<KafkaMessageBus> MessageBus { get; }

private static IDictionary<string, object> AddSsl(string username, string password, IDictionary<string, object> d)
{
// cloudkarafka.com uses SSL with SASL authentication
d.Add("security.protocol", "SASL_SSL");
d.Add("sasl.username", username);
d.Add("sasl.password", password);
d.Add("sasl.mechanism", "SCRAM-SHA-256");
d.Add("ssl.ca.location", @"cloudkarafka-ca-root.crt");
return d;
}

private string TopicPrefix { get; }

public KafkaMessageBusIt()
{
LogManager.Adapter = new DebugLoggerFactoryAdapter();
Expand All @@ -56,24 +68,28 @@ public KafkaMessageBusIt()
var kafkaUsername = Secrets.Service.PopulateSecrets(configuration["Kafka:Username"]);
var kafkaPassword = Secrets.Service.PopulateSecrets(configuration["Kafka:Password"]);

// Topics on cloudkarafka.com are prefixed with username
TopicPrefix = $"{kafkaUsername}-";

KafkaSettings = new KafkaMessageBusSettings(kafkaBrokers)
{
ProducerConfigFactory = () => new Dictionary<string, object>
ProducerConfigFactory = () => AddSsl(kafkaUsername, kafkaPassword, new Dictionary<string, object>
{
{"socket.blocking.max.ms", 1},
{"queue.buffering.max.ms", 1},
{"socket.nagle.disable", true},
//{"request.required.acks", 0}
},
ConsumerConfigFactory = (group) => new Dictionary<string, object>
}),
ConsumerConfigFactory = (group) => AddSsl(kafkaUsername, kafkaPassword, new Dictionary<string, object>
{
{"socket.blocking.max.ms", 1},
{"fetch.error.backoff.ms", 1},
{"statistics.interval.ms", 500000},
{"socket.nagle.disable", true},
{KafkaConfigKeys.ConsumerKeys.AutoOffsetReset, KafkaConfigValues.AutoOffsetReset.Earliest}
}
})
};

MessageBusBuilder = MessageBusBuilder.Create()
.WithSerializer(new JsonMessageSerializer())
.WithProviderKafka(KafkaSettings);
Expand Down Expand Up @@ -101,7 +117,7 @@ public async Task BasicPubSub()
// arrange

// ensure the topic has 2 partitions
var topic = "test-ping";
var topic = $"{TopicPrefix}test-ping";

var pingConsumer = new PingConsumer();

Expand All @@ -113,7 +129,8 @@ public async Task BasicPubSub()
// Partition #1 for odd counters
x.PartitionProvider((m, t) => m.Counter % 2);
})
.Consume<PingMessage>(x => {
.Consume<PingMessage>(x =>
{
x.Topic(topic)
.WithConsumer<PingConsumer>()
.Group("subscriber")
Expand Down Expand Up @@ -145,8 +162,8 @@ public async Task BasicPubSub()
Log.InfoFormat(CultureInfo.InvariantCulture, "Published {0} messages in {1}", messages.Count, stopwatch.Elapsed);

// consume
stopwatch.Restart();
stopwatch.Restart();

await WaitWhileMessagesAreFlowing(() => pingConsumer.Messages.Count);
var messagesReceived = pingConsumer.Messages;

Expand Down Expand Up @@ -177,7 +194,7 @@ public async Task BasicReqResp()
// arrange

// ensure the topic has 2 partitions
var topic = "test-echo";
var topic = $"{TopicPrefix}test-echo";
var echoRequestHandler = new EchoRequestHandler();

MessageBusBuilder
Expand All @@ -196,7 +213,7 @@ public async Task BasicReqResp()
.CheckpointAfter(TimeSpan.FromSeconds(60)))
.ExpectRequestResponses(x =>
{
x.ReplyToTopic("test-echo-resp");
x.ReplyToTopic($"{TopicPrefix}test-echo-resp");
x.Group("response-reader");
// for subsequent test runs allow enough time for kafka to reassign the partitions
x.DefaultTimeout(TimeSpan.FromSeconds(60));
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,9 @@
<None Update="appsettings.json">
<CopyToOutputDirectory>Always</CopyToOutputDirectory>
</None>
<None Include="..\..\cloudkarafka-ca-root.crt">
<CopyToOutputDirectory>Always</CopyToOutputDirectory>
</None>
<None Update="xunit.runner.json">
<CopyToOutputDirectory>PreserveNewest</CopyToOutputDirectory>
</None>
Expand Down
4 changes: 2 additions & 2 deletions src/Tests/SlimMessageBus.Host.Kafka.Test/appsettings.json
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
{
"Kafka": {
"Brokers": "localhost:9092",
"Brokers_": "moped-01.srvs.cloudkafka.com:9094,moped-02.srvs.cloudkafka.com:9094,moped-03.srvs.cloudkafka.com:9094",
"Brokers_": "localhost:9092",
"Brokers": "moped-01.srvs.cloudkafka.com:9094,moped-02.srvs.cloudkafka.com:9094,moped-03.srvs.cloudkafka.com:9094",
"Username": "{{kafka_username}}",
"Password": "{{kafka_password}}"
}
Expand Down
34 changes: 34 additions & 0 deletions src/cloudkarafka-ca-root.crt
Original file line number Diff line number Diff line change
@@ -0,0 +1,34 @@
-----BEGIN CERTIFICATE-----
MIIF2DCCA8CgAwIBAgIQTKr5yttjb+Af907YWwOGnTANBgkqhkiG9w0BAQwFADCB
hTELMAkGA1UEBhMCR0IxGzAZBgNVBAgTEkdyZWF0ZXIgTWFuY2hlc3RlcjEQMA4G
A1UEBxMHU2FsZm9yZDEaMBgGA1UEChMRQ09NT0RPIENBIExpbWl0ZWQxKzApBgNV
BAMTIkNPTU9ETyBSU0EgQ2VydGlmaWNhdGlvbiBBdXRob3JpdHkwHhcNMTAwMTE5
MDAwMDAwWhcNMzgwMTE4MjM1OTU5WjCBhTELMAkGA1UEBhMCR0IxGzAZBgNVBAgT
EkdyZWF0ZXIgTWFuY2hlc3RlcjEQMA4GA1UEBxMHU2FsZm9yZDEaMBgGA1UEChMR
Q09NT0RPIENBIExpbWl0ZWQxKzApBgNVBAMTIkNPTU9ETyBSU0EgQ2VydGlmaWNh
dGlvbiBBdXRob3JpdHkwggIiMA0GCSqGSIb3DQEBAQUAA4ICDwAwggIKAoICAQCR
6FSS0gpWsawNJN3Fz0RndJkrN6N9I3AAcbxT38T6KhKPS38QVr2fcHK3YX/JSw8X
pz3jsARh7v8Rl8f0hj4K+j5c+ZPmNHrZFGvnnLOFoIJ6dq9xkNfs/Q36nGz637CC
9BR++b7Epi9Pf5l/tfxnQ3K9DADWietrLNPtj5gcFKt+5eNu/Nio5JIk2kNrYrhV
/erBvGy2i/MOjZrkm2xpmfh4SDBF1a3hDTxFYPwyllEnvGfDyi62a+pGx8cgoLEf
Zd5ICLqkTqnyg0Y3hOvozIFIQ2dOciqbXL1MGyiKXCJ7tKuY2e7gUYPDCUZObT6Z
+pUX2nwzV0E8jVHtC7ZcryxjGt9XyD+86V3Em69FmeKjWiS0uqlWPc9vqv9JWL7w
qP/0uK3pN/u6uPQLOvnoQ0IeidiEyxPx2bvhiWC4jChWrBQdnArncevPDt09qZah
SL0896+1DSJMwBGB7FY79tOi4lu3sgQiUpWAk2nojkxl8ZEDLXB0AuqLZxUpaVIC
u9ffUGpVRr+goyhhf3DQw6KqLCGqR84onAZFdr+CGCe01a60y1Dma/RMhnEw6abf
Fobg2P9A3fvQQoh/ozM6LlweQRGBY84YcWsr7KaKtzFcOmpH4MN5WdYgGq/yapiq
crxXStJLnbsQ/LBMQeXtHT1eKJ2czL+zUdqnR+WEUwIDAQABo0IwQDAdBgNVHQ4E
FgQUu69+Aj36pvE8hI6t7jiY7NkyMtQwDgYDVR0PAQH/BAQDAgEGMA8GA1UdEwEB
/wQFMAMBAf8wDQYJKoZIhvcNAQEMBQADggIBAArx1UaEt65Ru2yyTUEUAJNMnMvl
wFTPoCWOAvn9sKIN9SCYPBMtrFaisNZ+EZLpLrqeLppysb0ZRGxhNaKatBYSaVqM
4dc+pBroLwP0rmEdEBsqpIt6xf4FpuHA1sj+nq6PK7o9mfjYcwlYRm6mnPTXJ9OV
2jeDchzTc+CiR5kDOF3VSXkAKRzH7JsgHAckaVd4sjn8OoSgtZx8jb8uk2Intzna
FxiuvTwJaP+EmzzV1gsD41eeFPfR60/IvYcjt7ZJQ3mFXLrrkguhxuhoqEwWsRqZ
CuhTLJK7oQkYdQxlqHvLI7cawiiFwxv/0Cti76R7CZGYZ4wUAc1oBmpjIXUDgIiK
boHGhfKppC3n9KUkEEeDys30jXlYsQab5xoq2Z0B15R97QNKyvDb6KkBPvVWmcke
jkk9u+UJueBPSZI9FoJAzMxZxuY67RIuaTxslbH9qh17f4a+Hg4yRvv7E491f0yL
S0Zj/gA0QHDBw7mh3aZw4gSzQbzpgJHqZJx64SIDqZxubw5lT2yHh17zbqD5daWb
QOhTsiedSrnAdyGN/4fy3ryM7xfft0kL0fJuMAsaDk527RH89elWsn2/x20Kk4yl
0MC2Hb46TpSi125sC8KKfPog88Tk5c0NqMuRkrF8hey1FGlmDoLnzc7ILaZRfyHB
NVOFBkpdn627G190
-----END CERTIFICATE-----

0 comments on commit 7709d01

Please sign in to comment.