Skip to content

Commit

Permalink
Merge pull request grpc#6991 from jtattermusch/csharp_fix_compressed_…
Browse files Browse the repository at this point in the history
…read

Fix compressed reads in C# and implement some compression interop tests.
  • Loading branch information
jtattermusch authored Jun 22, 2016
2 parents acbf8c2 + 606e35a commit c9396aa
Show file tree
Hide file tree
Showing 6 changed files with 453 additions and 142 deletions.
26 changes: 26 additions & 0 deletions src/csharp/Grpc.Core.Tests/CompressionTest.cs
Original file line number Diff line number Diff line change
Expand Up @@ -34,6 +34,7 @@
using System;
using System.Diagnostics;
using System.Linq;
using System.Text;
using System.Threading;
using System.Threading.Tasks;
using Grpc.Core;
Expand Down Expand Up @@ -118,5 +119,30 @@ public async Task WriteOptions_DuplexStreaming()

await call.ResponseStream.ToListAsync();
}

[Test]
public void CanReadCompressedMessages()
{
var compressionMetadata = new Metadata
{
{ new Metadata.Entry(Metadata.CompressionRequestAlgorithmMetadataKey, "gzip") }
};

helper.UnaryHandler = new UnaryServerMethod<string, string>(async (req, context) =>
{
await context.WriteResponseHeadersAsync(compressionMetadata);
return req;
});

var stringBuilder = new StringBuilder();
for (int i = 0; i < 200000; i++)
{
stringBuilder.Append('a');
}
var request = stringBuilder.ToString();
var response = Calls.BlockingUnaryCall(helper.CreateUnaryCall(new CallOptions(compressionMetadata)), request);

Assert.AreEqual(request, response);
}
}
}
7 changes: 7 additions & 0 deletions src/csharp/Grpc.Core/Metadata.cs
Original file line number Diff line number Diff line change
Expand Up @@ -63,6 +63,13 @@ public sealed class Metadata : IList<Metadata.Entry>
/// </summary>
public static readonly Metadata Empty = new Metadata().Freeze();

/// <summary>
/// To be used in initial metadata to request specific compression algorithm
/// for given call. Direct selection of compression algorithms is an internal
/// feature and is not part of public API.
/// </summary>
internal const string CompressionRequestAlgorithmMetadataKey = "grpc-internal-encoding-request";

readonly List<Entry> entries;
bool readOnly;

Expand Down
131 changes: 108 additions & 23 deletions src/csharp/Grpc.IntegrationTesting/InteropClient.cs
Original file line number Diff line number Diff line change
Expand Up @@ -222,6 +222,12 @@ private async Task RunTestCaseAsync(Channel channel, ClientOptions options)
case "unimplemented_method":
RunUnimplementedMethod(new UnimplementedService.UnimplementedServiceClient(channel));
break;
case "client_compressed_unary":
RunClientCompressedUnary(client);
break;
case "client_compressed_streaming":
await RunClientCompressedStreamingAsync(client);
break;
default:
throw new ArgumentException("Unknown test case " + options.TestCase);
}
Expand All @@ -240,13 +246,11 @@ public static void RunLargeUnary(TestService.TestServiceClient client)
Console.WriteLine("running large_unary");
var request = new SimpleRequest
{
ResponseType = PayloadType.Compressable,
ResponseSize = 314159,
Payload = CreateZerosPayload(271828)
};
var response = client.UnaryCall(request);

Assert.AreEqual(PayloadType.Compressable, response.Payload.Type);
Assert.AreEqual(314159, response.Payload.Body.Length);
Console.WriteLine("Passed!");
}
Expand Down Expand Up @@ -275,17 +279,12 @@ public static async Task RunServerStreamingAsync(TestService.TestServiceClient c

var request = new StreamingOutputCallRequest
{
ResponseType = PayloadType.Compressable,
ResponseParameters = { bodySizes.Select((size) => new ResponseParameters { Size = size }) }
};

using (var call = client.StreamingOutputCall(request))
{
var responseList = await call.ResponseStream.ToListAsync();
foreach (var res in responseList)
{
Assert.AreEqual(PayloadType.Compressable, res.Payload.Type);
}
CollectionAssert.AreEqual(bodySizes, responseList.Select((item) => item.Payload.Body.Length));
}
Console.WriteLine("Passed!");
Expand All @@ -299,46 +298,38 @@ public static async Task RunPingPongAsync(TestService.TestServiceClient client)
{
await call.RequestStream.WriteAsync(new StreamingOutputCallRequest
{
ResponseType = PayloadType.Compressable,
ResponseParameters = { new ResponseParameters { Size = 31415 } },
Payload = CreateZerosPayload(27182)
});

Assert.IsTrue(await call.ResponseStream.MoveNext());
Assert.AreEqual(PayloadType.Compressable, call.ResponseStream.Current.Payload.Type);
Assert.AreEqual(31415, call.ResponseStream.Current.Payload.Body.Length);

await call.RequestStream.WriteAsync(new StreamingOutputCallRequest
{
ResponseType = PayloadType.Compressable,
ResponseParameters = { new ResponseParameters { Size = 9 } },
Payload = CreateZerosPayload(8)
});

Assert.IsTrue(await call.ResponseStream.MoveNext());
Assert.AreEqual(PayloadType.Compressable, call.ResponseStream.Current.Payload.Type);
Assert.AreEqual(9, call.ResponseStream.Current.Payload.Body.Length);

await call.RequestStream.WriteAsync(new StreamingOutputCallRequest
{
ResponseType = PayloadType.Compressable,
ResponseParameters = { new ResponseParameters { Size = 2653 } },
Payload = CreateZerosPayload(1828)
});

Assert.IsTrue(await call.ResponseStream.MoveNext());
Assert.AreEqual(PayloadType.Compressable, call.ResponseStream.Current.Payload.Type);
Assert.AreEqual(2653, call.ResponseStream.Current.Payload.Body.Length);

await call.RequestStream.WriteAsync(new StreamingOutputCallRequest
{
ResponseType = PayloadType.Compressable,
ResponseParameters = { new ResponseParameters { Size = 58979 } },
Payload = CreateZerosPayload(45904)
});

Assert.IsTrue(await call.ResponseStream.MoveNext());
Assert.AreEqual(PayloadType.Compressable, call.ResponseStream.Current.Payload.Type);
Assert.AreEqual(58979, call.ResponseStream.Current.Payload.Body.Length);

await call.RequestStream.CompleteAsync();
Expand Down Expand Up @@ -367,7 +358,6 @@ public static void RunComputeEngineCreds(TestService.TestServiceClient client, s

var request = new SimpleRequest
{
ResponseType = PayloadType.Compressable,
ResponseSize = 314159,
Payload = CreateZerosPayload(271828),
FillUsername = true,
Expand All @@ -377,7 +367,6 @@ public static void RunComputeEngineCreds(TestService.TestServiceClient client, s
// not setting credentials here because they were set on channel already
var response = client.UnaryCall(request);

Assert.AreEqual(PayloadType.Compressable, response.Payload.Type);
Assert.AreEqual(314159, response.Payload.Body.Length);
Assert.False(string.IsNullOrEmpty(response.OauthScope));
Assert.True(oauthScope.Contains(response.OauthScope));
Expand All @@ -391,7 +380,6 @@ public static void RunJwtTokenCreds(TestService.TestServiceClient client)

var request = new SimpleRequest
{
ResponseType = PayloadType.Compressable,
ResponseSize = 314159,
Payload = CreateZerosPayload(271828),
FillUsername = true,
Expand All @@ -400,7 +388,6 @@ public static void RunJwtTokenCreds(TestService.TestServiceClient client)
// not setting credentials here because they were set on channel already
var response = client.UnaryCall(request);

Assert.AreEqual(PayloadType.Compressable, response.Payload.Type);
Assert.AreEqual(314159, response.Payload.Body.Length);
Assert.AreEqual(GetEmailFromServiceAccountFile(), response.Username);
Console.WriteLine("Passed!");
Expand Down Expand Up @@ -480,13 +467,11 @@ public static async Task RunCancelAfterFirstResponseAsync(TestService.TestServic
{
await call.RequestStream.WriteAsync(new StreamingOutputCallRequest
{
ResponseType = PayloadType.Compressable,
ResponseParameters = { new ResponseParameters { Size = 31415 } },
Payload = CreateZerosPayload(27182)
});

Assert.IsTrue(await call.ResponseStream.MoveNext());
Assert.AreEqual(PayloadType.Compressable, call.ResponseStream.Current.Payload.Type);
Assert.AreEqual(31415, call.ResponseStream.Current.Payload.Body.Length);

cts.Cancel();
Expand Down Expand Up @@ -546,7 +531,6 @@ public static async Task RunCustomMetadataAsync(TestService.TestServiceClient cl
// step 1: test unary call
var request = new SimpleRequest
{
ResponseType = PayloadType.Compressable,
ResponseSize = 314159,
Payload = CreateZerosPayload(271828)
};
Expand All @@ -565,7 +549,6 @@ public static async Task RunCustomMetadataAsync(TestService.TestServiceClient cl
// step 2: test full duplex call
var request = new StreamingOutputCallRequest
{
ResponseType = PayloadType.Compressable,
ResponseParameters = { new ResponseParameters { Size = 31415 } },
Payload = CreateZerosPayload(27182)
};
Expand Down Expand Up @@ -638,11 +621,113 @@ public static void RunUnimplementedMethod(UnimplementedService.UnimplementedServ
Console.WriteLine("Passed!");
}

public static void RunClientCompressedUnary(TestService.TestServiceClient client)
{
Console.WriteLine("running client_compressed_unary");
var probeRequest = new SimpleRequest
{
ExpectCompressed = new BoolValue
{
Value = true // lie about compression
},
ResponseSize = 314159,
Payload = CreateZerosPayload(271828)
};
var e = Assert.Throws<RpcException>(() => client.UnaryCall(probeRequest, CreateClientCompressionMetadata(false)));
Assert.AreEqual(StatusCode.InvalidArgument, e.Status.StatusCode);

var compressedRequest = new SimpleRequest
{
ExpectCompressed = new BoolValue
{
Value = true
},
ResponseSize = 314159,
Payload = CreateZerosPayload(271828)
};
var response1 = client.UnaryCall(compressedRequest, CreateClientCompressionMetadata(true));
Assert.AreEqual(314159, response1.Payload.Body.Length);

var uncompressedRequest = new SimpleRequest
{
ExpectCompressed = new BoolValue
{
Value = false
},
ResponseSize = 314159,
Payload = CreateZerosPayload(271828)
};
var response2 = client.UnaryCall(uncompressedRequest, CreateClientCompressionMetadata(false));
Assert.AreEqual(314159, response2.Payload.Body.Length);

Console.WriteLine("Passed!");
}

public static async Task RunClientCompressedStreamingAsync(TestService.TestServiceClient client)
{
Console.WriteLine("running client_compressed_streaming");
try
{
var probeCall = client.StreamingInputCall(CreateClientCompressionMetadata(false));
await probeCall.RequestStream.WriteAsync(new StreamingInputCallRequest
{
ExpectCompressed = new BoolValue
{
Value = true
},
Payload = CreateZerosPayload(27182)
});

// cannot use Assert.ThrowsAsync because it uses Task.Wait and would deadlock.
await probeCall;
Assert.Fail();
}
catch (RpcException e)
{
Assert.AreEqual(StatusCode.InvalidArgument, e.Status.StatusCode);
}

var call = client.StreamingInputCall(CreateClientCompressionMetadata(true));
await call.RequestStream.WriteAsync(new StreamingInputCallRequest
{
ExpectCompressed = new BoolValue
{
Value = true
},
Payload = CreateZerosPayload(27182)
});

call.RequestStream.WriteOptions = new WriteOptions(WriteFlags.NoCompress);
await call.RequestStream.WriteAsync(new StreamingInputCallRequest
{
ExpectCompressed = new BoolValue
{
Value = false
},
Payload = CreateZerosPayload(45904)
});
await call.RequestStream.CompleteAsync();

var response = await call.ResponseAsync;
Assert.AreEqual(73086, response.AggregatedPayloadSize);

Console.WriteLine("Passed!");
}

private static Payload CreateZerosPayload(int size)
{
return new Payload { Body = ByteString.CopyFrom(new byte[size]) };
}

private static Metadata CreateClientCompressionMetadata(bool compressed)
{
var algorithmName = compressed ? "gzip" : "identity";
return new Metadata
{
{ new Metadata.Entry(Metadata.CompressionRequestAlgorithmMetadataKey, algorithmName) }
};
}

// extracts the client_email field from service account file used for auth test cases
private static string GetEmailFromServiceAccountFile()
{
Expand Down
Loading

0 comments on commit c9396aa

Please sign in to comment.