Skip to content

Commit

Permalink
[Host.RabbitMQ] Automatically ack messages #21
Browse files Browse the repository at this point in the history
Signed-off-by: Tomasz Maruszak <maruszaktomasz@gmail.com>
  • Loading branch information
zarusz committed Mar 3, 2024
1 parent 022e735 commit b188799
Show file tree
Hide file tree
Showing 3 changed files with 24 additions and 19 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -4,26 +4,25 @@
/// Specifies how messages are confirmed with RabbitMq
/// </summary>
public enum RabbitMqMessageAcknowledgementMode

{
/// <summary>
/// Eeach succesfully processed message will get an Ack after processing, or when error happens the message will get an Nack in the end.
/// However, if the user made anyd manual ConsumerContext.Ack() or ConsumerContext.Nack() during the consumption process or in an interceptor, it will be used to confirm the message.
/// This results in at-least-once delivery guarantee.
/// </summary>
/// <remarks>Default option that is set</remarks>
ConfirmAfterMessageProcessingWhenNoManualConfirmMade = 0,

/// <summary>
/// The message will be Ack before the actuall message processing will start.
/// This results in at-most-once delivery guarantee (messages could be list if processing did not fully finish).
/// Each message will get an Ack after successful processing, or when error happens the message will get an Nack in the end.
/// However, if the user made any manual ConsumerContext.Ack() or ConsumerContext.Nack() during the consumption process (or in an interceptor), that will be used to confirm the message instead.
/// This results in at-least-once delivery guarantee and a safe processing.
/// </summary>
AckMessageBeforeProcessing = 1,
/// <remarks>That is the default option</remarks>
AutoConfirmAfterMessageProcessingWhenNoManualConfirmMade = 0,

/// <summary>
/// The message will already be considered as Ack upon recieve. See https://www.rabbitmq.com/docs/confirms#acknowledgement-modes for details.
/// This results in at-most-once delivery guarantee (messages could be list if processing did not fully finish).
/// This is managed by the protocol and should give faster throughput than <see cref="RabbitMqMessageAcknowledgementMode.AckMessageBeforeProcessing"/>.
/// This results in at-most-once delivery guarantee (messages could be lost if processing would not fully finish).
/// This is managed by the protocol and should give faster throughput than <see cref="RabbitMqMessageAcknowledgementMode.AckMessageBeforeProcessing"/> while leading to same delivery guarantees.
/// </summary>
AckAutomaticByRabbit = 1,

/// <summary>
/// The message will be Ack-ed by SMB before the actuall message processing starts.
/// This results in at-most-once delivery guarantee (messages could be lost if processing would not fully finish).
/// </summary>
AckAutomaticByRabbit = 2,
AckMessageBeforeProcessing = 2,
}
12 changes: 9 additions & 3 deletions src/SlimMessageBus.Host.RabbitMQ/Consumers/RabbitMqConsumer.cs
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,7 @@ public RabbitMqConsumer(ILoggerFactory loggerFactory, IRabbitMqChannel channel,
{
_messageBus = messageBus;
_acknowledgementMode = consumers.Select(x => x.GetOrDefault<RabbitMqMessageAcknowledgementMode?>(RabbitMqProperties.MessageAcknowledgementMode, messageBus.Settings)).FirstOrDefault(x => x != null)
?? RabbitMqMessageAcknowledgementMode.ConfirmAfterMessageProcessingWhenNoManualConfirmMade; // be default choose the safer acknowledgement mode
?? RabbitMqMessageAcknowledgementMode.AutoConfirmAfterMessageProcessingWhenNoManualConfirmMade; // be default choose the safer acknowledgement mode
_messageProcessor = new MessageProcessor<BasicDeliverEventArgs>(
consumers,
messageBus,
Expand All @@ -27,6 +27,12 @@ public RabbitMqConsumer(ILoggerFactory loggerFactory, IRabbitMqChannel channel,

private void InitializeConsumerContext(BasicDeliverEventArgs transportMessage, ConsumerContext consumerContext)
{
if (_acknowledgementMode == RabbitMqMessageAcknowledgementMode.AckAutomaticByRabbit)
{
// mark the message has already been confirmed when in automatic acknowledgment
consumerContext.Properties[ContextProperty_MessageConfirmed] = true;
}

// provide transport message
consumerContext.SetTransportMessage(transportMessage);
// provide methods to confirm message
Expand Down Expand Up @@ -76,7 +82,7 @@ protected override async Task<Exception> OnMessageRecieved(Dictionary<string, ob
var (exception, _, _, message) = await _messageProcessor.ProcessMessage(transportMessage, messageHeaders: messageHeaders, consumerContextProperties: consumerContextProperties, cancellationToken: CancellationToken);
if (exception == null)
{
if (_acknowledgementMode == RabbitMqMessageAcknowledgementMode.ConfirmAfterMessageProcessingWhenNoManualConfirmMade)
if (_acknowledgementMode == RabbitMqMessageAcknowledgementMode.AutoConfirmAfterMessageProcessingWhenNoManualConfirmMade)
{
// Acknowledge after processing
ConfirmMessage(transportMessage, RabbitMqMessageConfirmOption.Ack, consumerContextProperties);
Expand Down Expand Up @@ -107,7 +113,7 @@ private async Task OnMessageError(BasicDeliverEventArgs transportMessage, Dictio
}
}

if (_acknowledgementMode == RabbitMqMessageAcknowledgementMode.ConfirmAfterMessageProcessingWhenNoManualConfirmMade)
if (_acknowledgementMode == RabbitMqMessageAcknowledgementMode.AutoConfirmAfterMessageProcessingWhenNoManualConfirmMade)
{
// NAck after processing when message fails (unless the user already acknowledged in any way).
ConfirmMessage(transportMessage, RabbitMqMessageConfirmOption.Nack, consumerContextProperties);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,7 @@ public class RabbitMqResponseConsumer : AbstractRabbitMqConsumer
{
private readonly IMessageProcessor<BasicDeliverEventArgs> _messageProcessor;

protected override RabbitMqMessageAcknowledgementMode AcknowledgementMode => RabbitMqMessageAcknowledgementMode.ConfirmAfterMessageProcessingWhenNoManualConfirmMade;
protected override RabbitMqMessageAcknowledgementMode AcknowledgementMode => RabbitMqMessageAcknowledgementMode.AutoConfirmAfterMessageProcessingWhenNoManualConfirmMade;

public RabbitMqResponseConsumer(ILoggerFactory loggerFactory, IRabbitMqChannel channel, string queueName, RequestResponseSettings requestResponseSettings, MessageBusBase messageBus, IHeaderValueConverter headerValueConverter)
: base(loggerFactory.CreateLogger<RabbitMqConsumer>(), channel, queueName, headerValueConverter)
Expand Down

0 comments on commit b188799

Please sign in to comment.