Skip to content

Commit

Permalink
Merge pull request grpc#5314 from jtattermusch/csharp_perf_removehost
Browse files Browse the repository at this point in the history
C# qps worker updates
  • Loading branch information
vjpai committed Feb 23, 2016
2 parents 92f19ef + 13b63a0 commit 4938274
Show file tree
Hide file tree
Showing 7 changed files with 470 additions and 68 deletions.
419 changes: 365 additions & 54 deletions src/csharp/Grpc.IntegrationTesting/Control.cs

Large diffs are not rendered by default.

15 changes: 9 additions & 6 deletions src/csharp/Grpc.IntegrationTesting/QpsWorker.cs
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
#region Copyright notice and license

// Copyright 2015, Google Inc.
// Copyright 2015-2016, Google Inc.
// All rights reserved.
//
// Redistribution and use in source and binary forms, with or without
Expand Down Expand Up @@ -85,24 +85,27 @@ public static void Run(string[] args)
}

var workerServer = new QpsWorker(options);
workerServer.Run();
workerServer.RunAsync().Wait();
}

private void Run()
private async Task RunAsync()
{
string host = "0.0.0.0";
int port = options.DriverPort;

var tcs = new TaskCompletionSource<object>();
var workerServiceImpl = new WorkerServiceImpl(() => { Task.Run(() => tcs.SetResult(null)); });

var server = new Server
{
Services = { WorkerService.BindService(new WorkerServiceImpl()) },
Services = { WorkerService.BindService(workerServiceImpl) },
Ports = { new ServerPort(host, options.DriverPort, ServerCredentials.Insecure )}
};
int boundPort = server.Ports.Single().BoundPort;
Console.WriteLine("Running qps worker server on " + string.Format("{0}:{1}", host, boundPort));
server.Start();

server.ShutdownTask.Wait();
await tcs.Task;
await server.ShutdownAsync();
}
}
}
6 changes: 2 additions & 4 deletions src/csharp/Grpc.IntegrationTesting/RunnerClientServerTest.cs
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
#region Copyright notice and license

// Copyright 2015, Google Inc.
// Copyright 2015-2016, Google Inc.
// All rights reserved.
//
// Redistribution and use in source and binary forms, with or without
Expand Down Expand Up @@ -48,7 +48,6 @@ namespace Grpc.IntegrationTesting
/// </summary>
public class RunnerClientServerTest
{
const string Host = "localhost";
IServerRunner serverRunner;

[TestFixtureSetUp]
Expand All @@ -57,7 +56,6 @@ public void Init()
var serverConfig = new ServerConfig
{
ServerType = ServerType.ASYNC_SERVER,
Host = Host,
PayloadConfig = new PayloadConfig
{
SimpleParams = new SimpleProtoParams
Expand All @@ -83,7 +81,7 @@ public async Task ClientServerRunner()
{
var config = new ClientConfig
{
ServerTargets = { string.Format("{0}:{1}", Host, serverRunner.BoundPort) },
ServerTargets = { string.Format("{0}:{1}", "localhost", serverRunner.BoundPort) },
RpcType = RpcType.UNARY,
LoadParams = new LoadParams { ClosedLoop = new ClosedLoopParams() },
PayloadConfig = new PayloadConfig
Expand Down
2 changes: 1 addition & 1 deletion src/csharp/Grpc.IntegrationTesting/ServerRunners.cs
Original file line number Diff line number Diff line change
Expand Up @@ -65,7 +65,7 @@ public static IServerRunner CreateStarted(ServerConfig config)
var server = new Server
{
Services = { BenchmarkService.BindService(new BenchmarkServiceImpl(responseSize)) },
Ports = { new ServerPort(config.Host, config.Port, credentials) }
Ports = { new ServerPort("[::]", config.Port, credentials) }
};

server.Start();
Expand Down
7 changes: 5 additions & 2 deletions src/csharp/Grpc.IntegrationTesting/Services.cs
Original file line number Diff line number Diff line change
Expand Up @@ -29,11 +29,14 @@ static ServicesReflection() {
"QmVuY2htYXJrU2VydmljZRJGCglVbmFyeUNhbGwSGy5ncnBjLnRlc3Rpbmcu",
"U2ltcGxlUmVxdWVzdBocLmdycGMudGVzdGluZy5TaW1wbGVSZXNwb25zZRJO",
"Cg1TdHJlYW1pbmdDYWxsEhsuZ3JwYy50ZXN0aW5nLlNpbXBsZVJlcXVlc3Qa",
"HC5ncnBjLnRlc3RpbmcuU2ltcGxlUmVzcG9uc2UoATABMp0BCg1Xb3JrZXJT",
"HC5ncnBjLnRlc3RpbmcuU2ltcGxlUmVzcG9uc2UoATABMpcCCg1Xb3JrZXJT",
"ZXJ2aWNlEkUKCVJ1blNlcnZlchIYLmdycGMudGVzdGluZy5TZXJ2ZXJBcmdz",
"GhouZ3JwYy50ZXN0aW5nLlNlcnZlclN0YXR1cygBMAESRQoJUnVuQ2xpZW50",
"EhguZ3JwYy50ZXN0aW5nLkNsaWVudEFyZ3MaGi5ncnBjLnRlc3RpbmcuQ2xp",
"ZW50U3RhdHVzKAEwAWIGcHJvdG8z"));
"ZW50U3RhdHVzKAEwARJCCglDb3JlQ291bnQSGS5ncnBjLnRlc3RpbmcuQ29y",
"ZVJlcXVlc3QaGi5ncnBjLnRlc3RpbmcuQ29yZVJlc3BvbnNlEjQKClF1aXRX",
"b3JrZXISEi5ncnBjLnRlc3RpbmcuVm9pZBoSLmdycGMudGVzdGluZy5Wb2lk",
"YgZwcm90bzM="));
descriptor = pbr::FileDescriptor.FromGeneratedCode(descriptorData,
new pbr::FileDescriptor[] { global::Grpc.Testing.MessagesReflection.Descriptor, global::Grpc.Testing.ControlReflection.Descriptor, },
new pbr::GeneratedCodeInfo(null, null));
Expand Down
71 changes: 70 additions & 1 deletion src/csharp/Grpc.IntegrationTesting/ServicesGrpc.cs
Original file line number Diff line number Diff line change
Expand Up @@ -114,6 +114,9 @@ public static class WorkerService
static readonly Marshaller<global::Grpc.Testing.ServerStatus> __Marshaller_ServerStatus = Marshallers.Create((arg) => global::Google.Protobuf.MessageExtensions.ToByteArray(arg), global::Grpc.Testing.ServerStatus.Parser.ParseFrom);
static readonly Marshaller<global::Grpc.Testing.ClientArgs> __Marshaller_ClientArgs = Marshallers.Create((arg) => global::Google.Protobuf.MessageExtensions.ToByteArray(arg), global::Grpc.Testing.ClientArgs.Parser.ParseFrom);
static readonly Marshaller<global::Grpc.Testing.ClientStatus> __Marshaller_ClientStatus = Marshallers.Create((arg) => global::Google.Protobuf.MessageExtensions.ToByteArray(arg), global::Grpc.Testing.ClientStatus.Parser.ParseFrom);
static readonly Marshaller<global::Grpc.Testing.CoreRequest> __Marshaller_CoreRequest = Marshallers.Create((arg) => global::Google.Protobuf.MessageExtensions.ToByteArray(arg), global::Grpc.Testing.CoreRequest.Parser.ParseFrom);
static readonly Marshaller<global::Grpc.Testing.CoreResponse> __Marshaller_CoreResponse = Marshallers.Create((arg) => global::Google.Protobuf.MessageExtensions.ToByteArray(arg), global::Grpc.Testing.CoreResponse.Parser.ParseFrom);
static readonly Marshaller<global::Grpc.Testing.Void> __Marshaller_Void = Marshallers.Create((arg) => global::Google.Protobuf.MessageExtensions.ToByteArray(arg), global::Grpc.Testing.Void.Parser.ParseFrom);

static readonly Method<global::Grpc.Testing.ServerArgs, global::Grpc.Testing.ServerStatus> __Method_RunServer = new Method<global::Grpc.Testing.ServerArgs, global::Grpc.Testing.ServerStatus>(
MethodType.DuplexStreaming,
Expand All @@ -129,6 +132,20 @@ public static class WorkerService
__Marshaller_ClientArgs,
__Marshaller_ClientStatus);

static readonly Method<global::Grpc.Testing.CoreRequest, global::Grpc.Testing.CoreResponse> __Method_CoreCount = new Method<global::Grpc.Testing.CoreRequest, global::Grpc.Testing.CoreResponse>(
MethodType.Unary,
__ServiceName,
"CoreCount",
__Marshaller_CoreRequest,
__Marshaller_CoreResponse);

static readonly Method<global::Grpc.Testing.Void, global::Grpc.Testing.Void> __Method_QuitWorker = new Method<global::Grpc.Testing.Void, global::Grpc.Testing.Void>(
MethodType.Unary,
__ServiceName,
"QuitWorker",
__Marshaller_Void,
__Marshaller_Void);

// service descriptor
public static global::Google.Protobuf.Reflection.ServiceDescriptor Descriptor
{
Expand All @@ -142,13 +159,23 @@ public interface IWorkerServiceClient
AsyncDuplexStreamingCall<global::Grpc.Testing.ServerArgs, global::Grpc.Testing.ServerStatus> RunServer(CallOptions options);
AsyncDuplexStreamingCall<global::Grpc.Testing.ClientArgs, global::Grpc.Testing.ClientStatus> RunClient(Metadata headers = null, DateTime? deadline = null, CancellationToken cancellationToken = default(CancellationToken));
AsyncDuplexStreamingCall<global::Grpc.Testing.ClientArgs, global::Grpc.Testing.ClientStatus> RunClient(CallOptions options);
global::Grpc.Testing.CoreResponse CoreCount(global::Grpc.Testing.CoreRequest request, Metadata headers = null, DateTime? deadline = null, CancellationToken cancellationToken = default(CancellationToken));
global::Grpc.Testing.CoreResponse CoreCount(global::Grpc.Testing.CoreRequest request, CallOptions options);
AsyncUnaryCall<global::Grpc.Testing.CoreResponse> CoreCountAsync(global::Grpc.Testing.CoreRequest request, Metadata headers = null, DateTime? deadline = null, CancellationToken cancellationToken = default(CancellationToken));
AsyncUnaryCall<global::Grpc.Testing.CoreResponse> CoreCountAsync(global::Grpc.Testing.CoreRequest request, CallOptions options);
global::Grpc.Testing.Void QuitWorker(global::Grpc.Testing.Void request, Metadata headers = null, DateTime? deadline = null, CancellationToken cancellationToken = default(CancellationToken));
global::Grpc.Testing.Void QuitWorker(global::Grpc.Testing.Void request, CallOptions options);
AsyncUnaryCall<global::Grpc.Testing.Void> QuitWorkerAsync(global::Grpc.Testing.Void request, Metadata headers = null, DateTime? deadline = null, CancellationToken cancellationToken = default(CancellationToken));
AsyncUnaryCall<global::Grpc.Testing.Void> QuitWorkerAsync(global::Grpc.Testing.Void request, CallOptions options);
}

// server-side interface
public interface IWorkerService
{
Task RunServer(IAsyncStreamReader<global::Grpc.Testing.ServerArgs> requestStream, IServerStreamWriter<global::Grpc.Testing.ServerStatus> responseStream, ServerCallContext context);
Task RunClient(IAsyncStreamReader<global::Grpc.Testing.ClientArgs> requestStream, IServerStreamWriter<global::Grpc.Testing.ClientStatus> responseStream, ServerCallContext context);
Task<global::Grpc.Testing.CoreResponse> CoreCount(global::Grpc.Testing.CoreRequest request, ServerCallContext context);
Task<global::Grpc.Testing.Void> QuitWorker(global::Grpc.Testing.Void request, ServerCallContext context);
}

// client stub
Expand Down Expand Up @@ -177,14 +204,56 @@ public WorkerServiceClient(Channel channel) : base(channel)
var call = CreateCall(__Method_RunClient, options);
return Calls.AsyncDuplexStreamingCall(call);
}
public global::Grpc.Testing.CoreResponse CoreCount(global::Grpc.Testing.CoreRequest request, Metadata headers = null, DateTime? deadline = null, CancellationToken cancellationToken = default(CancellationToken))
{
var call = CreateCall(__Method_CoreCount, new CallOptions(headers, deadline, cancellationToken));
return Calls.BlockingUnaryCall(call, request);
}
public global::Grpc.Testing.CoreResponse CoreCount(global::Grpc.Testing.CoreRequest request, CallOptions options)
{
var call = CreateCall(__Method_CoreCount, options);
return Calls.BlockingUnaryCall(call, request);
}
public AsyncUnaryCall<global::Grpc.Testing.CoreResponse> CoreCountAsync(global::Grpc.Testing.CoreRequest request, Metadata headers = null, DateTime? deadline = null, CancellationToken cancellationToken = default(CancellationToken))
{
var call = CreateCall(__Method_CoreCount, new CallOptions(headers, deadline, cancellationToken));
return Calls.AsyncUnaryCall(call, request);
}
public AsyncUnaryCall<global::Grpc.Testing.CoreResponse> CoreCountAsync(global::Grpc.Testing.CoreRequest request, CallOptions options)
{
var call = CreateCall(__Method_CoreCount, options);
return Calls.AsyncUnaryCall(call, request);
}
public global::Grpc.Testing.Void QuitWorker(global::Grpc.Testing.Void request, Metadata headers = null, DateTime? deadline = null, CancellationToken cancellationToken = default(CancellationToken))
{
var call = CreateCall(__Method_QuitWorker, new CallOptions(headers, deadline, cancellationToken));
return Calls.BlockingUnaryCall(call, request);
}
public global::Grpc.Testing.Void QuitWorker(global::Grpc.Testing.Void request, CallOptions options)
{
var call = CreateCall(__Method_QuitWorker, options);
return Calls.BlockingUnaryCall(call, request);
}
public AsyncUnaryCall<global::Grpc.Testing.Void> QuitWorkerAsync(global::Grpc.Testing.Void request, Metadata headers = null, DateTime? deadline = null, CancellationToken cancellationToken = default(CancellationToken))
{
var call = CreateCall(__Method_QuitWorker, new CallOptions(headers, deadline, cancellationToken));
return Calls.AsyncUnaryCall(call, request);
}
public AsyncUnaryCall<global::Grpc.Testing.Void> QuitWorkerAsync(global::Grpc.Testing.Void request, CallOptions options)
{
var call = CreateCall(__Method_QuitWorker, options);
return Calls.AsyncUnaryCall(call, request);
}
}

// creates service definition that can be registered with a server
public static ServerServiceDefinition BindService(IWorkerService serviceImpl)
{
return ServerServiceDefinition.CreateBuilder(__ServiceName)
.AddMethod(__Method_RunServer, serviceImpl.RunServer)
.AddMethod(__Method_RunClient, serviceImpl.RunClient).Build();
.AddMethod(__Method_RunClient, serviceImpl.RunClient)
.AddMethod(__Method_CoreCount, serviceImpl.CoreCount)
.AddMethod(__Method_QuitWorker, serviceImpl.QuitWorker).Build();
}

// creates a new client
Expand Down
18 changes: 18 additions & 0 deletions src/csharp/Grpc.IntegrationTesting/WorkerServiceImpl.cs
Original file line number Diff line number Diff line change
Expand Up @@ -47,6 +47,13 @@ namespace Grpc.Testing
/// </summary>
public class WorkerServiceImpl : WorkerService.IWorkerService
{
readonly Action stopRequestHandler;

public WorkerServiceImpl(Action stopRequestHandler)
{
this.stopRequestHandler = Grpc.Core.Utils.Preconditions.CheckNotNull(stopRequestHandler);
}

public async Task RunServer(IAsyncStreamReader<ServerArgs> requestStream, IServerStreamWriter<ServerStatus> responseStream, ServerCallContext context)
{
GrpcPreconditions.CheckState(await requestStream.MoveNext());
Expand Down Expand Up @@ -92,5 +99,16 @@ await responseStream.WriteAsync(new ClientStatus
}
await runner.StopAsync();
}

public Task<CoreResponse> CoreCount(CoreRequest request, ServerCallContext context)
{
return Task.FromResult(new CoreResponse { Cores = Environment.ProcessorCount });
}

public Task<Void> QuitWorker(Void request, ServerCallContext context)
{
stopRequestHandler();
return Task.FromResult(new Void());
}
}
}

0 comments on commit 4938274

Please sign in to comment.