Skip to content

Commit

Permalink
Merge pull request grpc#1647 from jcanizales/buffered-pipe-and-ping-p…
Browse files Browse the repository at this point in the history
…ong-test

Adds GRXBufferedPipe and the PingPong interop test using it
  • Loading branch information
murgatroid99 committed May 19, 2015
2 parents 626b4d1 + 42e47b4 commit 9a688df
Show file tree
Hide file tree
Showing 3 changed files with 292 additions and 11 deletions.
59 changes: 59 additions & 0 deletions src/objective-c/RxLibrary/GRXBufferedPipe.h
Original file line number Diff line number Diff line change
@@ -0,0 +1,59 @@
/*
*
* Copyright 2015, Google Inc.
* All rights reserved.
*
* Redistribution and use in source and binary forms, with or without
* modification, are permitted provided that the following conditions are
* met:
*
* * Redistributions of source code must retain the above copyright
* notice, this list of conditions and the following disclaimer.
* * Redistributions in binary form must reproduce the above
* copyright notice, this list of conditions and the following disclaimer
* in the documentation and/or other materials provided with the
* distribution.
* * Neither the name of Google Inc. nor the names of its
* contributors may be used to endorse or promote products derived from
* this software without specific prior written permission.
*
* THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS
* "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT
* LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR
* A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT
* OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL,
* SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT
* LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE,
* DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY
* THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT
* (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE
* OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
*
*/

#import <Foundation/Foundation.h>

#import "GRXWriteable.h"
#import "GRXWriter.h"

// A buffered pipe is a Writeable that also acts as a Writer (to whichever other writeable is passed
// to -startWithWriteable:).
// Once it is started, whatever values are written into it (via -didReceiveValue:) will be
// propagated immediately, unless flow control prevents it.
// If it is throttled and keeps receiving values, as well as if it receives values before being
// started, it will buffer them and propagate them in order as soon as its state becomes
// GRXWriterStateStarted.
// If it receives an error (via -didFinishWithError:), it will drop any buffered values and
// propagate the error immediately.
//
// Beware that a pipe of this type can't prevent receiving more values when it is paused (for
// example if used to write data to a congested network connection). Because in such situations the
// pipe will keep buffering all data written to it, your application could run out of memory and
// crash. If you want to react to flow control signals to prevent that, instead of using this class
// you can implement an object that conforms to GRXWriter.
@interface GRXBufferedPipe : NSObject<GRXWriteable, GRXWriter>

// Convenience constructor.
+ (instancetype)pipe;

@end
146 changes: 146 additions & 0 deletions src/objective-c/RxLibrary/GRXBufferedPipe.m
Original file line number Diff line number Diff line change
@@ -0,0 +1,146 @@
/*
*
* Copyright 2015, Google Inc.
* All rights reserved.
*
* Redistribution and use in source and binary forms, with or without
* modification, are permitted provided that the following conditions are
* met:
*
* * Redistributions of source code must retain the above copyright
* notice, this list of conditions and the following disclaimer.
* * Redistributions in binary form must reproduce the above
* copyright notice, this list of conditions and the following disclaimer
* in the documentation and/or other materials provided with the
* distribution.
* * Neither the name of Google Inc. nor the names of its
* contributors may be used to endorse or promote products derived from
* this software without specific prior written permission.
*
* THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS
* "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT
* LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR
* A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT
* OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL,
* SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT
* LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE,
* DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY
* THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT
* (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE
* OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
*
*/

#import "GRXBufferedPipe.h"

@implementation GRXBufferedPipe {
id<GRXWriteable> _writeable;
NSMutableArray *_queue;
BOOL _inputIsFinished;
NSError *_errorOrNil;
}

@synthesize state = _state;

+ (instancetype)pipe {
return [[self alloc] init];
}

- (instancetype)init {
if (self = [super init]) {
_queue = [NSMutableArray array];
_state = GRXWriterStateNotStarted;
}
return self;
}

- (id)popValue {
id value = _queue[0];
[_queue removeObjectAtIndex:0];
return value;
}

- (void)writeBufferUntilPausedOrStopped {
while (_state == GRXWriterStateStarted && _queue.count > 0) {
[_writeable didReceiveValue:[self popValue]];
}
if (_inputIsFinished && _queue.count == 0) {
// Our writer finished normally while we were paused or not-started-yet.
[self finishWithError:_errorOrNil];
}
}

#pragma mark GRXWriteable implementation

// Returns whether events can be simply propagated to the other end of the pipe.
- (BOOL)shouldFastForward {
return _state == GRXWriterStateStarted && _queue.count == 0;
}

- (void)didReceiveValue:(id)value {
if (self.shouldFastForward) {
// Skip the queue.
[_writeable didReceiveValue:value];
} else {
// Even if we're paused and with enqueued values, we can't excert back-pressure to our writer.
// So just buffer the new value.
// We need a copy, so that it doesn't mutate before it's written at the other end of the pipe.
if ([value respondsToSelector:@selector(copy)]) {
value = [value copy];
}
[_queue addObject:value];
}
}

- (void)didFinishWithError:(NSError *)errorOrNil {
_inputIsFinished = YES;
_errorOrNil = errorOrNil;
if (errorOrNil || self.shouldFastForward) {
// No need to write pending values.
[self finishWithError:_errorOrNil];
}
}

#pragma mark GRXWriter implementation

- (void)setState:(GRXWriterState)newState {
// Manual transitions are only allowed from the started or paused states.
if (_state == GRXWriterStateNotStarted || _state == GRXWriterStateFinished) {
return;
}

switch (newState) {
case GRXWriterStateFinished:
_state = newState;
_queue = nil;
// Per GRXWriter's contract, setting the state to Finished manually means one doesn't wish the
// writeable to be messaged anymore.
_writeable = nil;
return;
case GRXWriterStatePaused:
_state = newState;
return;
case GRXWriterStateStarted:
if (_state == GRXWriterStatePaused) {
_state = newState;
[self writeBufferUntilPausedOrStopped];
}
return;
case GRXWriterStateNotStarted:
return;
}
}

- (void)startWithWriteable:(id<GRXWriteable>)writeable {
_state = GRXWriterStateStarted;
_writeable = writeable;
[self writeBufferUntilPausedOrStopped];
}

- (void)finishWithError:(NSError *)errorOrNil {
id<GRXWriteable> writeable = _writeable;
self.state = GRXWriterStateFinished;
[writeable didFinishWithError:errorOrNil];
}

@end
98 changes: 87 additions & 11 deletions src/objective-c/examples/Sample/SampleTests/RemoteProtoTests.m
Original file line number Diff line number Diff line change
Expand Up @@ -36,13 +36,46 @@
#import <UIKit/UIKit.h>
#import <XCTest/XCTest.h>

#import <gRPC/ProtoRPC.h>
#import <gRPC/GRXWriter+Immediate.h>
#import <gRPC/GRXBufferedPipe.h>
#import <gRPC/ProtoRPC.h>
#import <RemoteTest/Empty.pbobjc.h>
#import <RemoteTest/Messages.pbobjc.h>
#import <RemoteTest/Test.pbobjc.h>
#import <RemoteTest/Test.pbrpc.h>

// Convenience constructors for the generated proto messages:

@interface RMTStreamingOutputCallRequest (Constructors)
+ (instancetype)messageWithPayloadSize:(NSNumber *)payloadSize
requestedResponseSize:(NSNumber *)responseSize;
@end

@implementation RMTStreamingOutputCallRequest (Constructors)
+ (instancetype)messageWithPayloadSize:(NSNumber *)payloadSize
requestedResponseSize:(NSNumber *)responseSize {
RMTStreamingOutputCallRequest *request = [self message];
RMTResponseParameters *parameters = [RMTResponseParameters message];
parameters.size = responseSize.integerValue;
[request.responseParametersArray addObject:parameters];
request.payload.body = [NSMutableData dataWithLength:payloadSize.unsignedIntegerValue];
return request;
}
@end

@interface RMTStreamingOutputCallResponse (Constructors)
+ (instancetype)messageWithPayloadSize:(NSNumber *)payloadSize;
@end

@implementation RMTStreamingOutputCallResponse (Constructors)
+ (instancetype)messageWithPayloadSize:(NSNumber *)payloadSize {
RMTStreamingOutputCallResponse * response = [self message];
response.payload.type = RMTPayloadType_Compressable;
response.payload.body = [NSMutableData dataWithLength:payloadSize.unsignedIntegerValue];
return response;
}
@end

@interface RemoteProtoTests : XCTestCase
@end

Expand Down Expand Up @@ -70,7 +103,7 @@ - (void)testEmptyUnaryRPC {
[expectation fulfill];
}];

[self waitForExpectationsWithTimeout:2. handler:nil];
[self waitForExpectationsWithTimeout:2 handler:nil];
}

- (void)testLargeUnaryRPC {
Expand All @@ -92,7 +125,7 @@ - (void)testLargeUnaryRPC {
[expectation fulfill];
}];

[self waitForExpectationsWithTimeout:4. handler:nil];
[self waitForExpectationsWithTimeout:4 handler:nil];
}

- (void)testClientStreamingRPC {
Expand Down Expand Up @@ -124,7 +157,7 @@ - (void)testClientStreamingRPC {
[expectation fulfill];
}];

[self waitForExpectationsWithTimeout:4. handler:nil];
[self waitForExpectationsWithTimeout:4 handler:nil];
}

- (void)testServerStreamingRPC {
Expand All @@ -149,10 +182,7 @@ - (void)testServerStreamingRPC {

if (response) {
XCTAssertLessThan(index, 4, @"More than 4 responses received.");
RMTStreamingOutputCallResponse * expected = [RMTStreamingOutputCallResponse message];
expected.payload.type = RMTPayloadType_Compressable;
int expectedSize = [expectedSizes[index] unsignedIntegerValue];
expected.payload.body = [NSMutableData dataWithLength:expectedSize];
id expected = [RMTStreamingOutputCallResponse messageWithPayloadSize:expectedSizes[index]];
XCTAssertEqualObjects(response, expected);
index += 1;
}
Expand All @@ -166,6 +196,49 @@ - (void)testServerStreamingRPC {
[self waitForExpectationsWithTimeout:4 handler:nil];
}

- (void)testPingPongRPC {
__weak XCTestExpectation *expectation = [self expectationWithDescription:@"PingPong"];

NSArray *requests = @[@27182, @8, @1828, @45904];
NSArray *responses = @[@31415, @9, @2653, @58979];

GRXBufferedPipe *requestsBuffer = [[GRXBufferedPipe alloc] init];

__block int index = 0;

id request = [RMTStreamingOutputCallRequest messageWithPayloadSize:requests[index]
requestedResponseSize:responses[index]];
[requestsBuffer didReceiveValue:request];

[_service fullDuplexCallWithRequestsWriter:requestsBuffer
handler:^(BOOL done,
RMTStreamingOutputCallResponse *response,
NSError *error) {
XCTAssertNil(error, @"Finished with unexpected error: %@", error);
XCTAssertTrue(done || response, @"Event handler called without an event.");

if (response) {
XCTAssertLessThan(index, 4, @"More than 4 responses received.");
id expected = [RMTStreamingOutputCallResponse messageWithPayloadSize:responses[index]];
XCTAssertEqualObjects(response, expected);
index += 1;
if (index < 4) {
id request = [RMTStreamingOutputCallRequest messageWithPayloadSize:requests[index]
requestedResponseSize:responses[index]];
[requestsBuffer didReceiveValue:request];
} else {
[requestsBuffer didFinishWithError:nil];
}
}

if (done) {
XCTAssertEqual(index, 4, @"Received %i responses instead of 4.", index);
[expectation fulfill];
}
}];
[self waitForExpectationsWithTimeout:2 handler:nil];
}

- (void)testEmptyStreamRPC {
__weak XCTestExpectation *expectation = [self expectationWithDescription:@"EmptyStream"];
[_service fullDuplexCallWithRequestsWriter:[GRXWriter emptyWriter]
Expand All @@ -176,13 +249,16 @@ - (void)testEmptyStreamRPC {
XCTAssert(done, @"Unexpected response: %@", response);
[expectation fulfill];
}];
[self waitForExpectationsWithTimeout:4 handler:nil];
[self waitForExpectationsWithTimeout:2 handler:nil];
}

- (void)testCancelAfterBeginRPC {
__weak XCTestExpectation *expectation = [self expectationWithDescription:@"CancelAfterBegin"];
// TODO(mlumish): change to writing that blocks instead of writing
ProtoRPC *call = [_service RPCToStreamingInputCallWithRequestsWriter:[GRXWriter emptyWriter]

// A buffered pipe to which we never write any value acts as a writer that just hangs.
GRXBufferedPipe *requestsBuffer = [[GRXBufferedPipe alloc] init];

ProtoRPC *call = [_service RPCToStreamingInputCallWithRequestsWriter:requestsBuffer
handler:^(RMTStreamingInputCallResponse *response,
NSError *error) {
XCTAssertEqual(error.code, GRPC_STATUS_CANCELLED);
Expand Down

0 comments on commit 9a688df

Please sign in to comment.