Skip to content

Commit

Permalink
f
Browse files Browse the repository at this point in the history
  • Loading branch information
EtherZa committed Jul 17, 2024
1 parent d1865a5 commit de87cc0
Showing 1 changed file with 60 additions and 47 deletions.
107 changes: 60 additions & 47 deletions src/SlimMessageBus.Host.Outbox.Sql/SqlOutboxRepository.cs
Original file line number Diff line number Diff line change
@@ -1,5 +1,7 @@
namespace SlimMessageBus.Host.Outbox.Sql;

using Azure;

public class SqlOutboxRepository : CommonSqlRepository, ISqlOutboxRepository
{
private readonly SqlOutboxTemplate _sqlTemplate;
Expand Down Expand Up @@ -43,57 +45,68 @@ await ExecuteNonQuery(Settings.SqlSettings.OperationRetry, _sqlTemplate.SqlOutbo

public async Task<IReadOnlyCollection<OutboxMessage>> LockAndSelect(string instanceId, int batchSize, bool tableLock, TimeSpan lockDuration, CancellationToken token)
{
await EnsureConnection();

using var cmd = CreateCommand();
cmd.CommandText = tableLock ? _sqlTemplate.SqlOutboxMessageLockTableAndSelect : _sqlTemplate.SqlOutboxMessageLockAndSelect;
cmd.Parameters.Add("@InstanceId", SqlDbType.NVarChar).Value = instanceId;
cmd.Parameters.Add("@BatchSize", SqlDbType.Int).Value = batchSize;
cmd.Parameters.Add("@LockDuration", SqlDbType.Int).Value = lockDuration.TotalSeconds;

using var reader = await cmd.ExecuteReaderAsync(token);

var idOrdinal = reader.GetOrdinal("Id");
var timestampOrdinal = reader.GetOrdinal("Timestamp");
var busNameOrdinal = reader.GetOrdinal("BusName");
var typeOrdinal = reader.GetOrdinal("MessageType");
var payloadOrdinal = reader.GetOrdinal("MessagePayload");
var headersOrdinal = reader.GetOrdinal("Headers");
var pathOrdinal = reader.GetOrdinal("Path");
var instanceIdOrdinal = reader.GetOrdinal("InstanceId");
var lockInstanceIdOrdinal = reader.GetOrdinal("LockInstanceId");
var lockExpiresOnOrdinal = reader.GetOrdinal("LockExpiresOn");
var deliveryAttemptOrdinal = reader.GetOrdinal("DeliveryAttempt");
var deliveryCompleteOrdinal = reader.GetOrdinal("DeliveryComplete");
var deliveryAbortedOrdinal = reader.GetOrdinal("DeliveryAborted");

var items = new List<OutboxMessage>();
while (await reader.ReadAsync(token).ConfigureAwait(false))
try
{
var id = reader.GetGuid(idOrdinal);
var messageType = reader.GetString(typeOrdinal);
var headers = reader.IsDBNull(headersOrdinal) ? null : reader.GetString(headersOrdinal);
var message = new OutboxMessage
await EnsureConnection();

using var cmd = CreateCommand();
cmd.CommandText = tableLock ? _sqlTemplate.SqlOutboxMessageLockTableAndSelect : _sqlTemplate.SqlOutboxMessageLockAndSelect;
cmd.Parameters.Add("@InstanceId", SqlDbType.NVarChar).Value = instanceId;
cmd.Parameters.Add("@BatchSize", SqlDbType.Int).Value = batchSize;
cmd.Parameters.Add("@LockDuration", SqlDbType.Int).Value = lockDuration.TotalSeconds;

using var reader = await cmd.ExecuteReaderAsync(token);

var idOrdinal = reader.GetOrdinal("Id");
var timestampOrdinal = reader.GetOrdinal("Timestamp");
var busNameOrdinal = reader.GetOrdinal("BusName");
var typeOrdinal = reader.GetOrdinal("MessageType");
var payloadOrdinal = reader.GetOrdinal("MessagePayload");
var headersOrdinal = reader.GetOrdinal("Headers");
var pathOrdinal = reader.GetOrdinal("Path");
var instanceIdOrdinal = reader.GetOrdinal("InstanceId");
var lockInstanceIdOrdinal = reader.GetOrdinal("LockInstanceId");
var lockExpiresOnOrdinal = reader.GetOrdinal("LockExpiresOn");
var deliveryAttemptOrdinal = reader.GetOrdinal("DeliveryAttempt");
var deliveryCompleteOrdinal = reader.GetOrdinal("DeliveryComplete");
var deliveryAbortedOrdinal = reader.GetOrdinal("DeliveryAborted");

var items = new List<OutboxMessage>();
while (await reader.ReadAsync(token).ConfigureAwait(false))
{
Id = id,
Timestamp = reader.GetDateTime(timestampOrdinal),
BusName = reader.GetString(busNameOrdinal),
MessageType = Settings.MessageTypeResolver.ToType(messageType) ?? throw new MessageBusException($"Outbox message with Id {id} - the MessageType {messageType} is not recognized. The type might have been renamed or moved namespaces."),
MessagePayload = reader.GetSqlBinary(payloadOrdinal).Value,
Headers = headers == null ? null : JsonSerializer.Deserialize<IDictionary<string, object>>(headers, _jsonOptions),
Path = reader.IsDBNull(pathOrdinal) ? null : reader.GetString(pathOrdinal),
InstanceId = reader.GetString(instanceIdOrdinal),
LockInstanceId = reader.IsDBNull(lockInstanceIdOrdinal) ? null : reader.GetString(lockInstanceIdOrdinal),
LockExpiresOn = reader.IsDBNull(lockExpiresOnOrdinal) ? null : reader.GetDateTime(lockExpiresOnOrdinal),
DeliveryAttempt = reader.GetInt32(deliveryAttemptOrdinal),
DeliveryComplete = reader.GetBoolean(deliveryCompleteOrdinal),
DeliveryAborted = reader.GetBoolean(deliveryAbortedOrdinal)
};

items.Add(message);
var id = reader.GetGuid(idOrdinal);
var messageType = reader.GetString(typeOrdinal);
var headers = reader.IsDBNull(headersOrdinal) ? null : reader.GetString(headersOrdinal);
var message = new OutboxMessage
{
Id = id,
Timestamp = reader.GetDateTime(timestampOrdinal),
BusName = reader.GetString(busNameOrdinal),
MessageType = Settings.MessageTypeResolver.ToType(messageType) ?? throw new MessageBusException($"Outbox message with Id {id} - the MessageType {messageType} is not recognized. The type might have been renamed or moved namespaces."),
MessagePayload = reader.GetSqlBinary(payloadOrdinal).Value,
Headers = headers == null ? null : JsonSerializer.Deserialize<IDictionary<string, object>>(headers, _jsonOptions),
Path = reader.IsDBNull(pathOrdinal) ? null : reader.GetString(pathOrdinal),
InstanceId = reader.GetString(instanceIdOrdinal),
LockInstanceId = reader.IsDBNull(lockInstanceIdOrdinal) ? null : reader.GetString(lockInstanceIdOrdinal),
LockExpiresOn = reader.IsDBNull(lockExpiresOnOrdinal) ? null : reader.GetDateTime(lockExpiresOnOrdinal),
DeliveryAttempt = reader.GetInt32(deliveryAttemptOrdinal),
DeliveryComplete = reader.GetBoolean(deliveryCompleteOrdinal),
DeliveryAborted = reader.GetBoolean(deliveryAbortedOrdinal)
};

items.Add(message);
}

return items;
}
catch (SqlException) when (token.IsCancellationRequested)
{
// Avoid - Microsoft.Data.SqlClient.SqlException (0x80131904): A severe error occurred on the current command. The results, if any, should be discarded. Operation cancelled by user.
token.ThrowIfCancellationRequested();

return items;
// should never get here
throw;
}
}

public async Task UpdateToSent(IReadOnlyCollection<Guid> ids, CancellationToken token)
Expand Down

0 comments on commit de87cc0

Please sign in to comment.