Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

ADO.NET Grain Directory #9263

Open
wants to merge 21 commits into
base: main
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Prev Previous commit
Next Next commit
Refactored
  • Loading branch information
JorgeCandeias committed Dec 7, 2024
commit bdc5cc5486f1eedf9a507b2b811624e3cabe4241
16 changes: 5 additions & 11 deletions src/AdoNet/Orleans.GrainDirectory.AdoNet/AdoNetGrainDirectory.cs
Original file line number Diff line number Diff line change
@@ -1,7 +1,6 @@
using System;
using System.Collections.Generic;
using System.Linq;
using System.Reflection.PortableExecutable;
using System.Threading;
using System.Threading.Tasks;
using Microsoft.Extensions.Hosting;
Expand Down Expand Up @@ -56,19 +55,14 @@ public async Task<GrainAddress> Register(GrainAddress address)
{
var queries = await GetQueriesAsync();

var count = await queries
// this call is expected to register a new entry or return the existing one if found in a thread safe manner
var entry = await queries
.RegisterGrainActivationAsync(_clusterId, name, address.GrainId.ToString(), address.SiloAddress.ToParsableString(), address.ActivationId.ToParsableString())
.WaitAsync(lifetime.ApplicationStopping);

if (count > 0)
{
LogRegistered(_clusterId, address.GrainId, address.SiloAddress, address.ActivationId);
return address;
}
else
{
return await Lookup(address.GrainId);
}
LogRegistered(_clusterId, address.GrainId, address.SiloAddress, address.ActivationId);

return entry.ToGrainAddress();
}
catch (Exception ex)
{
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -82,49 +82,66 @@ DECLARE @Now DATETIMEOFFSET(3) = CAST(SYSUTCDATETIME() AS DATETIMEOFFSET(3));

BEGIN TRANSACTION;

/* Place and hold a lock on the hash range upfront to prevent both duplicates and deadlocks. */
/* This is required to ensure the server always locks the index before it locks the underlying table. */
DECLARE @Locked INT =
(
SELECT COUNT(*)
FROM OrleansGrainDirectory WITH (UPDLOCK, PAGLOCK, HOLDLOCK, INDEX(IX_OrleansGrainDirectory_Lookup))
WHERE
ClusterId = @ClusterId
AND ProviderId = @ProviderId
AND GrainIdHash = @GrainIdHash
);

/* It is now safe to add the entry. */
INSERT INTO OrleansGrainDirectory
(
/* First we check if the entry already exists. */
/* This also induces and holds a lock on the hash index upfront to prevent both duplicates and deadlocks. */
/* This is also required to ensure the server always locks the index before it locks the underlying table upon modification. */
SELECT
ClusterId,
ProviderId,
GrainIdHash,
GrainId,
SiloAddress,
ActivationId,
CreatedOn
)
SELECT
@ClusterId,
@ProviderId,
@GrainIdHash,
@GrainId,
@SiloAddress,
@ActivationId,
@Now
WHERE NOT EXISTS
(
SELECT 1
FROM OrleansGrainDirectory WITH (UPDLOCK, PAGLOCK, HOLDLOCK, INDEX(IX_OrleansGrainDirectory_Lookup))
WHERE
ClusterId = @ClusterId
AND ProviderId = @ProviderId
AND GrainIdHash = @GrainIdHash
AND GrainId = @GrainId
);
FROM
OrleansGrainDirectory WITH (UPDLOCK, PAGLOCK, HOLDLOCK, INDEX(IX_OrleansGrainDirectory_Lookup))
WHERE
ClusterId = @ClusterId
AND ProviderId = @ProviderId
AND GrainIdHash = @GrainIdHash
AND GrainId = @GrainId;

SELECT @@ROWCOUNT;
/* If no current entry was found we can add a new one now. */
IF @@ROWCOUNT = 0
BEGIN
INSERT INTO OrleansGrainDirectory
(
ClusterId,
ProviderId,
GrainIdHash,
GrainId,
SiloAddress,
ActivationId,
CreatedOn
)
OUTPUT
INSERTED.ClusterId,
INSERTED.ProviderId,
INSERTED.GrainId,
INSERTED.SiloAddress,
INSERTED.ActivationId,
INSERTED.CreatedOn
SELECT
@ClusterId,
@ProviderId,
@GrainIdHash,
@GrainId,
@SiloAddress,
@ActivationId,
@Now

/* This check should not be required given we are already holding a lock on the hash. */
/* However it is included here as an extra safety measure. */
WHERE NOT EXISTS
(
SELECT 1
FROM OrleansGrainDirectory WITH (UPDLOCK, PAGLOCK, HOLDLOCK, INDEX(IX_OrleansGrainDirectory_Lookup))
WHERE
ClusterId = @ClusterId
AND ProviderId = @ProviderId
AND GrainIdHash = @GrainIdHash
AND GrainId = @GrainId
);
END

COMMIT;

Expand Down Expand Up @@ -154,8 +171,8 @@ SET XACT_ABORT ON;

BEGIN TRANSACTION;

/* Place and hold a lock on the hash range upfront to prevent both duplicates and deadlocks. */
/* This is required to ensure the server always locks the index before it locks the underlying table. */
/* Induce a lock on the hash index upfront to prevent both duplicates and deadlocks. */
/* This is required to ensure the server always locks the index before it locks the underlying table upon modification. */
DECLARE @Locked INT =
(
SELECT COUNT(*)
Expand All @@ -164,6 +181,7 @@ DECLARE @Locked INT =
ClusterId = @ClusterId
AND ProviderId = @ProviderId
AND GrainIdHash = @GrainIdHash
AND GrainId = @GrainId
);

/* It is now safe to remove the entry. */
Expand Down
10 changes: 7 additions & 3 deletions src/AdoNet/Shared/Storage/RelationalOrleansQueries.cs
Original file line number Diff line number Diff line change
@@ -1,7 +1,6 @@
using System;
using System.Collections.Generic;
using System.Data;
using System.IO;
using System.Linq;
using System.Text;
using System.Threading.Tasks;
Expand Down Expand Up @@ -593,7 +592,7 @@ internal Task EvictStreamDeadLettersAsync(string serviceId, string providerId, s
/// <param name="siloAddress">The silo address.</param>
/// <param name="activationId">The activation identifier.</param>
/// <returns>The count of rows affected.</returns>
internal Task<int> RegisterGrainActivationAsync(string clusterId, string providerId, string grainId, string siloAddress, string activationId)
internal Task<AdoNetGrainDirectoryEntry> RegisterGrainActivationAsync(string clusterId, string providerId, string grainId, string siloAddress, string activationId)
{
ArgumentNullException.ThrowIfNull(clusterId);
ArgumentNullException.ThrowIfNull(providerId);
Expand All @@ -605,7 +604,12 @@ internal Task<int> RegisterGrainActivationAsync(string clusterId, string provide

return ReadAsync(
dbStoredQueries.RegisterGrainActivationKey,
record => record.GetInt32(0),
record => new AdoNetGrainDirectoryEntry(
(string)record[nameof(AdoNetGrainDirectoryEntry.ClusterId)],
(string)record[nameof(AdoNetGrainDirectoryEntry.ProviderId)],
(string)record[nameof(AdoNetGrainDirectoryEntry.GrainId)],
(string)record[nameof(AdoNetGrainDirectoryEntry.SiloAddress)],
(string)record[nameof(AdoNetGrainDirectoryEntry.ActivationId)]),
command => new DbStoredQueries.Columns(command)
{
ClusterId = clusterId,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -87,10 +87,15 @@ public async Task RelationalOrleansQueries_RegistersActivation()
await _storage.ExecuteAsync("DELETE FROM OrleansGrainDirectory");

// act
var count = await _queries.RegisterGrainActivationAsync(clusterId, providerId, grainId, siloAddress, activationId);
var entry = await _queries.RegisterGrainActivationAsync(clusterId, providerId, grainId, siloAddress, activationId);

// assert
Assert.Equal(1, count);
Assert.NotNull(entry);
Assert.Equal(clusterId, entry.ClusterId);
Assert.Equal(providerId, entry.ProviderId);
Assert.Equal(grainId, entry.GrainId);
Assert.Equal(siloAddress, entry.SiloAddress);
Assert.Equal(activationId, entry.ActivationId);

var results = await _storage.ReadAsync<AdoNetGrainDirectoryEntry>("SELECT * FROM OrleansGrainDirectory");
var result = Assert.Single(results);
Expand All @@ -117,12 +122,12 @@ public async Task RelationalOrleansQueries_UnregistersActivation()
await _storage.ExecuteAsync("DELETE FROM OrleansGrainDirectory");

// act
var count1 = await _queries.RegisterGrainActivationAsync(clusterId, providerId, grainId, siloAddress, activationId);
var count2 = await _queries.UnregisterGrainActivationAsync(clusterId, providerId, grainId, activationId);
var entry = await _queries.RegisterGrainActivationAsync(clusterId, providerId, grainId, siloAddress, activationId);
var count = await _queries.UnregisterGrainActivationAsync(clusterId, providerId, grainId, activationId);

// assert
Assert.Equal(1, count1);
Assert.Equal(1, count2);
Assert.NotNull(entry);
Assert.Equal(1, count);

var results = await _storage.ReadAsync<AdoNetGrainDirectoryEntry>("SELECT * FROM OrleansGrainDirectory");
Assert.Empty(results);
Expand All @@ -144,11 +149,11 @@ public async Task RelationalOrleansQueries_LooksUpActivation()
await _storage.ExecuteAsync("DELETE FROM OrleansGrainDirectory");

// act
var count = await _queries.RegisterGrainActivationAsync(clusterId, providerId, grainId, siloAddress, activationId);
var entry = await _queries.RegisterGrainActivationAsync(clusterId, providerId, grainId, siloAddress, activationId);
var result = await _queries.LookupGrainActivationAsync(clusterId, providerId, grainId);

// assert
Assert.Equal(1, count);
Assert.NotNull(entry);
Assert.NotNull(result);
Assert.Equal(clusterId, result.ClusterId);
Assert.Equal(providerId, result.ProviderId);
Expand Down