-
Notifications
You must be signed in to change notification settings - Fork 78
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
[Host.Outbox] Allow implementation OutboxRepository for a specific type PK #311
Comments
Signed-off-by: krivchenko_kv <kvkrivchenko@activebc.ru>
In principle, it's great to extend the Outbox implementation to Postgres and thanks for taking the time. I wonder if we truly need to provide that much PK flexibility which will bring in complexity to accommodate all the particulars:
Is there a strong need to avoid UUIDs/Guids for performance reasons and to prefer monotonically increasing PK (values generated by the DB)? These are just my thoughts thinking out loud and perhaps I am missing something (have not worked with Postgress for a while). Please let me know. |
I am trying to solve the following infrastructure-specific problem. A large number of messages and slow consumers overflow a bus with a limited size, so before sending I need to make sure that the bus is ready to receive messages. Moreover, I need to hold all incoming messages for a long time - that's why partitions As for generating an identifier and choosing a type for the key, the problem of "getting a timestamp from [ULID vs UUID7] and writing to a hot index area (when adding new records) + time-based partitions will be solved At this stage it is still a draft and I am not sure about the correctness of the idea (It smells like Kafka in here.) But I still think it's right to introduce a generator method for choosing a strategy (https://steven-giesel.com/blogPost/ea42a518-4d8b-4e08-8f73-e542bdd3b983) public interface IOutboxRepository
{
Task<Guid> GenerateId(CancellationToken cancellationToken); // for client or database |
|
Ok I understand now the use case. Let me look at the PR late today. |
Guid is used in many databases, so the as generic type is probably overhead... ps. The only thing missing is PKey control (want Genid). |
@dundich I reviewed thePR #312 as well as the discussion in here. This PR #312 causes a generics explosion that is dragged across many layers... Here #314 is my quick proposal to allow flexible PKs, including preservation of existing Guids, but opening this up leveraging UUIDv7 capability (or other types) to achieve ordered PKs. The key elements are:
The PR is draft, few test need to be fixed. Let me know your thoughts. |
looks better but I have some comments, ... later |
// +
public interface ITimeResolver
{
DateTime GetUtcNow();
}
public class TimeResolver : ITimeResolver
{
public DateTime GetUtcNow() => DateTime.UtcNow;
}
// ~
public class GuidOutboxMessageAdapter(ITimeResolver timeResolver) : IOutboxMessageAdapter
{
public OutboxMessage Create()
=> new GuidOutboxMessage
{
Id = Guid.NewGuid(),
Timestamp = timeResolver.GetUtcNow() // <= ITimeResolver
};
// ....
}
// ~ this will affect the internal class OutboxSendingTask +
// ~ init OutboxMessage
public abstract class OutboxMessage
{
public DateTime Timestamp { get; set; } // = DateTime.UtcNow; no need |
public interface IOutboxMessagePooler
{
OutboxMessage[] Rent(int batchSize);
void Return(OutboxMessage[] array);
}
public class OutboxMessagePooler: IOutboxMessagePooler
{
public OutboxMessage[] Rent(int batchSize) => ArrayPool<OutboxMessage>.Shared.Rent(batchSize);
public void Return(OutboxMessage[] array) => ArrayPool<OutboxMessage>.Shared.Return(array);
} LockAndWrite instead of LockAndSelect public interface IOutboxRepository
{
Task<int> LockAndWrite(Memory<OutboxMessage> buffer, string instanceId, int batchSize, ... CancellationToken token); ~ class OutboxSendingTask.SendMessages async internal Task<int> SendMessages(IServiceProvider serviceProvider, IOutboxRepository outboxRepository, CancellationToken cancellationToken)
{
//....
do
{
int batchSize = _outboxSettings.PollBatchSize;
var array = messagePooler.Rent(batchSize); // <= 1
try
{
var buffer = array.AsMemory(0, batchSize);
var writtenCount = await outboxRepository.LockAndWrite(buffer, lockRenewalTimer.InstanceId, batchSize, _outboxSettings.MaintainSequence, lockRenewalTimer.LockDuration, cts.Token);
var outboxMessages = buffer.Slice(0, writtenCount);
var (RunAgain, Count) = await ProcessMessages(outboxRepository, outboxMessages, compositeMessageBus, messageBusTarget, cts.Token);
}
finally
{
messagePooler.Return(array); // <= 2
}
} while ();
} ps. There's an internal mechanism here and I think we can play with it. After the release of C# 8, all of its features were available only in projects compatible with .NET Standard version 2.1 or higher. https://stackoverflow.com/questions/406485/array-slices-in-c-sharp/65898435#65898435 public interface IOutboxRepository
{
Task<int> LockAndWrite(OutboxMessage[] buffer, string instanceId, int batchSize, ... CancellationToken token); |
-- I think not that many. Type conversion and working with Reflections looks a bit heavy. I'm more concerned about GC performance here. I would return the Id field to the OutboxMessage class... |
If we can stick with the
Yes, these are nice ideas to incorporate. As always to be compatible with netstandard2.0 some #if preprocessor will have to be done (working without the benefits of these new memory types prior net8.0). Remember SMB multi-targets netstandard2.0, net6.0 and net8.0. I don't want to go to netstandard2.1 just yet to have a broader reach.
Yes, this could be added, I suggest to the What are the next steps here @dundich? Let me know. |
Where do you see the Reflections in there?
Would you like anything particular from me (e.g. evolve my draft PR)??
Do you want to start evolving your PR along the lines of #314 with your suggestions above?
ps |
I can help with some of the initial work and perhaps evolve my PR to enable some preparations for your Postgres adaptation of Outbox. I think it could simply be a dialect setting on the existing Outbox.Sql package (no need to create an entirely new package solely for Postgres). The NewId is the introduction of sequential Guids/UUIDs per what you wrote earlier and need. We could have something similar added to the SlimMessageBus.Host, so that other plugins and providers can benefit from that prior NET9 - especially the Outbox. |
I want to create outbox repository for Postgres, and I need the following features to create tables as PARTITION:
for example:
The current implementation in the base class does not allow to do this.
The text was updated successfully, but these errors were encountered: