Skip to content

Commit

Permalink
refcounting completion queue shutdown
Browse files Browse the repository at this point in the history
  • Loading branch information
jtattermusch committed Dec 17, 2015
1 parent 416000f commit ea7469d
Show file tree
Hide file tree
Showing 5 changed files with 154 additions and 42 deletions.
2 changes: 1 addition & 1 deletion src/csharp/Grpc.Core/Internal/AsyncCallServer.cs
Original file line number Diff line number Diff line change
Expand Up @@ -58,7 +58,7 @@ public AsyncCallServer(Func<TResponse, byte[]> serializer, Func<byte[], TRequest

public void Initialize(CallSafeHandle call)
{
call.SetCompletionRegistry(environment.CompletionRegistry);
call.Initialize(environment.CompletionRegistry, environment.CompletionQueue);

server.AddCallReference(this);
InitializeInternal(call);
Expand Down
33 changes: 29 additions & 4 deletions src/csharp/Grpc.Core/Internal/AtomicCounter.cs
Original file line number Diff line number Diff line change
Expand Up @@ -40,14 +40,39 @@ internal class AtomicCounter
{
long counter = 0;

public void Increment()
public AtomicCounter(long initialCount = 0)
{
Interlocked.Increment(ref counter);
this.counter = initialCount;
}

public void Decrement()
public long Increment()
{
Interlocked.Decrement(ref counter);
return Interlocked.Increment(ref counter);
}

public void IncrementIfNonzero(ref bool success)
{
long origValue = counter;
while (true)
{
if (origValue == 0)
{
success = false;
return;
}
long result = Interlocked.CompareExchange(ref counter, origValue + 1, origValue);
if (result == origValue)
{
success = true;
return;
};
origValue = result;
}
}

public long Decrement()
{
return Interlocked.Decrement(ref counter);
}

public long Count
Expand Down
105 changes: 70 additions & 35 deletions src/csharp/Grpc.Core/Internal/CallSafeHandle.cs
Original file line number Diff line number Diff line change
Expand Up @@ -47,6 +47,7 @@ internal class CallSafeHandle : SafeHandleZeroIsInvalid, INativeCall

const uint GRPC_WRITE_BUFFER_HINT = 1;
CompletionRegistry completionRegistry;
CompletionQueueSafeHandle completionQueue;

[DllImport("grpc_csharp_ext.dll")]
static extern GRPCCallError grpcsharp_call_cancel(CallSafeHandle call);
Expand Down Expand Up @@ -112,9 +113,10 @@ private CallSafeHandle()
{
}

public void SetCompletionRegistry(CompletionRegistry completionRegistry)
public void Initialize(CompletionRegistry completionRegistry, CompletionQueueSafeHandle completionQueue)
{
this.completionRegistry = completionRegistry;
this.completionQueue = completionQueue;
}

public void SetCredentials(CallCredentialsSafeHandle credentials)
Expand All @@ -124,10 +126,13 @@ public void SetCredentials(CallCredentialsSafeHandle credentials)

public void StartUnary(UnaryResponseClientHandler callback, byte[] payload, MetadataArraySafeHandle metadataArray, WriteFlags writeFlags)
{
var ctx = BatchContextSafeHandle.Create();
completionRegistry.RegisterBatchCompletion(ctx, (success, context) => callback(success, context.GetReceivedStatusOnClient(), context.GetReceivedMessage(), context.GetReceivedInitialMetadata()));
grpcsharp_call_start_unary(this, ctx, payload, new UIntPtr((ulong)payload.Length), metadataArray, writeFlags)
.CheckOk();
using (completionQueue.NewScope())
{
var ctx = BatchContextSafeHandle.Create();
completionRegistry.RegisterBatchCompletion(ctx, (success, context) => callback(success, context.GetReceivedStatusOnClient(), context.GetReceivedMessage(), context.GetReceivedInitialMetadata()));
grpcsharp_call_start_unary(this, ctx, payload, new UIntPtr((ulong)payload.Length), metadataArray, writeFlags)
.CheckOk();
}
}

public void StartUnary(BatchContextSafeHandle ctx, byte[] payload, MetadataArraySafeHandle metadataArray, WriteFlags writeFlags)
Expand All @@ -141,72 +146,102 @@ public void StartUnary(BatchContextSafeHandle ctx, byte[] payload, MetadataArray

public void StartClientStreaming(UnaryResponseClientHandler callback, MetadataArraySafeHandle metadataArray)
{
var ctx = BatchContextSafeHandle.Create();
completionRegistry.RegisterBatchCompletion(ctx, (success, context) => callback(success, context.GetReceivedStatusOnClient(), context.GetReceivedMessage(), context.GetReceivedInitialMetadata()));
grpcsharp_call_start_client_streaming(this, ctx, metadataArray).CheckOk();
using (completionQueue.NewScope())
{
var ctx = BatchContextSafeHandle.Create();
completionRegistry.RegisterBatchCompletion(ctx, (success, context) => callback(success, context.GetReceivedStatusOnClient(), context.GetReceivedMessage(), context.GetReceivedInitialMetadata()));
grpcsharp_call_start_client_streaming(this, ctx, metadataArray).CheckOk();
}
}

public void StartServerStreaming(ReceivedStatusOnClientHandler callback, byte[] payload, MetadataArraySafeHandle metadataArray, WriteFlags writeFlags)
{
var ctx = BatchContextSafeHandle.Create();
completionRegistry.RegisterBatchCompletion(ctx, (success, context) => callback(success, context.GetReceivedStatusOnClient()));
grpcsharp_call_start_server_streaming(this, ctx, payload, new UIntPtr((ulong)payload.Length), metadataArray, writeFlags).CheckOk();
using (completionQueue.NewScope())
{
var ctx = BatchContextSafeHandle.Create();
completionRegistry.RegisterBatchCompletion(ctx, (success, context) => callback(success, context.GetReceivedStatusOnClient()));
grpcsharp_call_start_server_streaming(this, ctx, payload, new UIntPtr((ulong)payload.Length), metadataArray, writeFlags).CheckOk();
}
}

public void StartDuplexStreaming(ReceivedStatusOnClientHandler callback, MetadataArraySafeHandle metadataArray)
{
var ctx = BatchContextSafeHandle.Create();
completionRegistry.RegisterBatchCompletion(ctx, (success, context) => callback(success, context.GetReceivedStatusOnClient()));
grpcsharp_call_start_duplex_streaming(this, ctx, metadataArray).CheckOk();
using (completionQueue.NewScope())
{
var ctx = BatchContextSafeHandle.Create();
completionRegistry.RegisterBatchCompletion(ctx, (success, context) => callback(success, context.GetReceivedStatusOnClient()));
grpcsharp_call_start_duplex_streaming(this, ctx, metadataArray).CheckOk();
}
}

public void StartSendMessage(SendCompletionHandler callback, byte[] payload, WriteFlags writeFlags, bool sendEmptyInitialMetadata)
{
var ctx = BatchContextSafeHandle.Create();
completionRegistry.RegisterBatchCompletion(ctx, (success, context) => callback(success));
grpcsharp_call_send_message(this, ctx, payload, new UIntPtr((ulong)payload.Length), writeFlags, sendEmptyInitialMetadata).CheckOk();
using (completionQueue.NewScope())
{
var ctx = BatchContextSafeHandle.Create();
completionRegistry.RegisterBatchCompletion(ctx, (success, context) => callback(success));
grpcsharp_call_send_message(this, ctx, payload, new UIntPtr((ulong)payload.Length), writeFlags, sendEmptyInitialMetadata).CheckOk();
}
}

public void StartSendCloseFromClient(SendCompletionHandler callback)
{
var ctx = BatchContextSafeHandle.Create();
completionRegistry.RegisterBatchCompletion(ctx, (success, context) => callback(success));
grpcsharp_call_send_close_from_client(this, ctx).CheckOk();
using (completionQueue.NewScope())
{
var ctx = BatchContextSafeHandle.Create();
completionRegistry.RegisterBatchCompletion(ctx, (success, context) => callback(success));
grpcsharp_call_send_close_from_client(this, ctx).CheckOk();
}
}

public void StartSendStatusFromServer(SendCompletionHandler callback, Status status, MetadataArraySafeHandle metadataArray, bool sendEmptyInitialMetadata)
{
var ctx = BatchContextSafeHandle.Create();
completionRegistry.RegisterBatchCompletion(ctx, (success, context) => callback(success));
grpcsharp_call_send_status_from_server(this, ctx, status.StatusCode, status.Detail, metadataArray, sendEmptyInitialMetadata).CheckOk();
using (completionQueue.NewScope())
{
var ctx = BatchContextSafeHandle.Create();
completionRegistry.RegisterBatchCompletion(ctx, (success, context) => callback(success));
grpcsharp_call_send_status_from_server(this, ctx, status.StatusCode, status.Detail, metadataArray, sendEmptyInitialMetadata).CheckOk();
}
}

public void StartReceiveMessage(ReceivedMessageHandler callback)
{
var ctx = BatchContextSafeHandle.Create();
completionRegistry.RegisterBatchCompletion(ctx, (success, context) => callback(success, context.GetReceivedMessage()));
grpcsharp_call_recv_message(this, ctx).CheckOk();
using (completionQueue.NewScope())
{
var ctx = BatchContextSafeHandle.Create();
completionRegistry.RegisterBatchCompletion(ctx, (success, context) => callback(success, context.GetReceivedMessage()));
grpcsharp_call_recv_message(this, ctx).CheckOk();
}
}

public void StartReceiveInitialMetadata(ReceivedResponseHeadersHandler callback)
{
var ctx = BatchContextSafeHandle.Create();
completionRegistry.RegisterBatchCompletion(ctx, (success, context) => callback(success, context.GetReceivedInitialMetadata()));
grpcsharp_call_recv_initial_metadata(this, ctx).CheckOk();
using (completionQueue.NewScope())
{
var ctx = BatchContextSafeHandle.Create();
completionRegistry.RegisterBatchCompletion(ctx, (success, context) => callback(success, context.GetReceivedInitialMetadata()));
grpcsharp_call_recv_initial_metadata(this, ctx).CheckOk();
}
}

public void StartServerSide(ReceivedCloseOnServerHandler callback)
{
var ctx = BatchContextSafeHandle.Create();
completionRegistry.RegisterBatchCompletion(ctx, (success, context) => callback(success, context.GetReceivedCloseOnServerCancelled()));
grpcsharp_call_start_serverside(this, ctx).CheckOk();
using (completionQueue.NewScope())
{
var ctx = BatchContextSafeHandle.Create();
completionRegistry.RegisterBatchCompletion(ctx, (success, context) => callback(success, context.GetReceivedCloseOnServerCancelled()));
grpcsharp_call_start_serverside(this, ctx).CheckOk();
}
}

public void StartSendInitialMetadata(SendCompletionHandler callback, MetadataArraySafeHandle metadataArray)
{
var ctx = BatchContextSafeHandle.Create();
completionRegistry.RegisterBatchCompletion(ctx, (success, context) => callback(success));
grpcsharp_call_send_initial_metadata(this, ctx, metadataArray).CheckOk();
using (completionQueue.NewScope())
{
var ctx = BatchContextSafeHandle.Create();
completionRegistry.RegisterBatchCompletion(ctx, (success, context) => callback(success));
grpcsharp_call_send_initial_metadata(this, ctx, metadataArray).CheckOk();
}
}

public void Cancel()
Expand Down
2 changes: 1 addition & 1 deletion src/csharp/Grpc.Core/Internal/ChannelSafeHandle.cs
Original file line number Diff line number Diff line change
Expand Up @@ -92,7 +92,7 @@ public CallSafeHandle CreateCall(CompletionRegistry registry, CallSafeHandle par
{
result.SetCredentials(credentials);
}
result.SetCompletionRegistry(registry);
result.Initialize(registry, cq);
return result;
}
}
Expand Down
54 changes: 53 additions & 1 deletion src/csharp/Grpc.Core/Internal/CompletionQueueSafeHandle.cs
Original file line number Diff line number Diff line change
Expand Up @@ -33,13 +33,17 @@
using System.Threading.Tasks;
using Grpc.Core.Profiling;

using Grpc.Core.Utils;

namespace Grpc.Core.Internal
{
/// <summary>
/// grpc_completion_queue from <c>grpc/grpc.h</c>
/// </summary>
internal class CompletionQueueSafeHandle : SafeHandleZeroIsInvalid
{
AtomicCounter shutdownRefcount = new AtomicCounter(1);

[DllImport("grpc_csharp_ext.dll")]
static extern CompletionQueueSafeHandle grpcsharp_completion_queue_create();

Expand All @@ -62,6 +66,7 @@ private CompletionQueueSafeHandle()
public static CompletionQueueSafeHandle Create()
{
return grpcsharp_completion_queue_create();

}

public CompletionQueueEvent Next()
Expand All @@ -77,15 +82,62 @@ public CompletionQueueEvent Pluck(IntPtr tag)
}
}

/// <summary>
/// Creates a new usage scope for this completion queue. Once successfully created,
/// the completion queue won't be shutdown before scope.Dispose() is called.
/// </summary>
public UsageScope NewScope()
{
return new UsageScope(this);
}

public void Shutdown()
{
grpcsharp_completion_queue_shutdown(this);
DecrementShutdownRefcount();
}

protected override bool ReleaseHandle()
{
grpcsharp_completion_queue_destroy(handle);
return true;
}

private void DecrementShutdownRefcount()
{
if (shutdownRefcount.Decrement() == 0)
{
grpcsharp_completion_queue_shutdown(this);
}
}

private void BeginOp()
{
bool success = false;
shutdownRefcount.IncrementIfNonzero(ref success);
Preconditions.CheckState(success, "Shutdown has already been called");
}

private void EndOp()
{
DecrementShutdownRefcount();
}

// Allows declaring BeginOp and EndOp of a completion queue with a using statement.
// Declared as struct for better performance.
public struct UsageScope : IDisposable
{
readonly CompletionQueueSafeHandle cq;

public UsageScope(CompletionQueueSafeHandle cq)
{
this.cq = cq;
this.cq.BeginOp();
}

public void Dispose()
{
cq.EndOp();
}
}
}
}

0 comments on commit ea7469d

Please sign in to comment.