diff --git a/src/objective-c/RxLibrary/GRXBufferedPipe.h b/src/objective-c/RxLibrary/GRXBufferedPipe.h index bd7d4ad69172386c40c13e3997b0e4a3cd2ca057..a871ea895aaf3b2b4506a16bf7ca0ca89037bd5a 100644 --- a/src/objective-c/RxLibrary/GRXBufferedPipe.h +++ b/src/objective-c/RxLibrary/GRXBufferedPipe.h @@ -27,8 +27,8 @@ * immediately, unless flow control prevents it. * If it is throttled and keeps receiving values, as well as if it receives values before being * started, it will buffer them and propagate them in order as soon as its state becomes Started. - * If it receives an error (via -writesFinishedWithError:), it will drop any buffered values and - * propagate the error immediately. + * If it receives an end of stream (via -writesFinishedWithError:), it will buffer the EOS after the + * last buffered value and issue it to the writeable after all buffered values are issued. * * Beware that a pipe of this type can't prevent receiving more values when it is paused (for * example if used to write data to a congested network connection). Because in such situations the diff --git a/src/objective-c/RxLibrary/GRXBufferedPipe.m b/src/objective-c/RxLibrary/GRXBufferedPipe.m index e4a7cc40f9fb1922f6fb3359de8a53881206fee5..99cb0ad97139963f20b8d840028d81f5a242b6bd 100644 --- a/src/objective-c/RxLibrary/GRXBufferedPipe.m +++ b/src/objective-c/RxLibrary/GRXBufferedPipe.m @@ -18,11 +18,13 @@ #import "GRXBufferedPipe.h" +@interface GRXBufferedPipe () +@property(atomic) id<GRXWriteable> writeable; +@end + @implementation GRXBufferedPipe { - id<GRXWriteable> _writeable; - NSMutableArray *_queue; - BOOL _inputIsFinished; NSError *_errorOrNil; + dispatch_queue_t _writeQueue; } @synthesize state = _state; @@ -33,99 +35,79 @@ - (instancetype)init { if (self = [super init]) { - _queue = [NSMutableArray array]; _state = GRXWriterStateNotStarted; + _writeQueue = dispatch_queue_create(NULL, DISPATCH_QUEUE_SERIAL); + dispatch_suspend(_writeQueue); } return self; } -- (id)popValue { - id value = _queue[0]; - [_queue removeObjectAtIndex:0]; - return value; -} - -- (void)writeBufferUntilPausedOrStopped { - while (_state == GRXWriterStateStarted && _queue.count > 0) { - [_writeable writeValue:[self popValue]]; - } - if (_inputIsFinished && _queue.count == 0) { - // Our writer finished normally while we were paused or not-started-yet. - [self finishWithError:_errorOrNil]; - } -} - #pragma mark GRXWriteable implementation -// Returns whether events can be simply propagated to the other end of the pipe. -- (BOOL)shouldFastForward { - return _state == GRXWriterStateStarted && _queue.count == 0; -} - - (void)writeValue:(id)value { - if (self.shouldFastForward) { - // Skip the queue. - [_writeable writeValue:value]; - } else { + if ([value respondsToSelector:@selector(copy)]) { // Even if we're paused and with enqueued values, we can't excert back-pressure to our writer. // So just buffer the new value. // We need a copy, so that it doesn't mutate before it's written at the other end of the pipe. - if ([value respondsToSelector:@selector(copy)]) { - value = [value copy]; - } - [_queue addObject:value]; + value = [value copy]; } + __weak GRXBufferedPipe *weakSelf = self; + dispatch_async(_writeQueue, ^(void) { + [weakSelf.writeable writeValue:value]; + }); } - (void)writesFinishedWithError:(NSError *)errorOrNil { - _inputIsFinished = YES; - _errorOrNil = errorOrNil; - if (errorOrNil || self.shouldFastForward) { - // No need to write pending values. - [self finishWithError:_errorOrNil]; - } + __weak GRXBufferedPipe *weakSelf = self; + dispatch_async(_writeQueue, ^{ + [weakSelf finishWithError:errorOrNil]; + }); } #pragma mark GRXWriter implementation - (void)setState:(GRXWriterState)newState { - // Manual transitions are only allowed from the started or paused states. - if (_state == GRXWriterStateNotStarted || _state == GRXWriterStateFinished) { - return; - } - - switch (newState) { - case GRXWriterStateFinished: - _state = newState; - _queue = nil; - // Per GRXWriter's contract, setting the state to Finished manually means one doesn't wish the - // writeable to be messaged anymore. - _writeable = nil; - return; - case GRXWriterStatePaused: - _state = newState; + @synchronized (self) { + // Manual transitions are only allowed from the started or paused states. + if (_state == GRXWriterStateNotStarted || _state == GRXWriterStateFinished) { return; - case GRXWriterStateStarted: - if (_state == GRXWriterStatePaused) { + } + + switch (newState) { + case GRXWriterStateFinished: + self.writeable = nil; + if (_state == GRXWriterStatePaused) { + dispatch_resume(_writeQueue); + } _state = newState; - [self writeBufferUntilPausedOrStopped]; - } - return; - case GRXWriterStateNotStarted: - return; + return; + case GRXWriterStatePaused: + if (_state == GRXWriterStateStarted) { + _state = newState; + dispatch_suspend(_writeQueue); + } + return; + case GRXWriterStateStarted: + if (_state == GRXWriterStatePaused) { + _state = newState; + dispatch_resume(_writeQueue); + } + return; + case GRXWriterStateNotStarted: + return; + } } } - (void)startWithWriteable:(id<GRXWriteable>)writeable { + self.writeable = writeable; _state = GRXWriterStateStarted; - _writeable = writeable; - [self writeBufferUntilPausedOrStopped]; + dispatch_resume(_writeQueue); } - (void)finishWithError:(NSError *)errorOrNil { - id<GRXWriteable> writeable = _writeable; + [self.writeable writesFinishedWithError:errorOrNil]; self.state = GRXWriterStateFinished; - [writeable writesFinishedWithError:errorOrNil]; } @end diff --git a/src/objective-c/tests/RxLibraryUnitTests.m b/src/objective-c/tests/RxLibraryUnitTests.m index f152452b01373dcd7bde2c7f5d4526b2bfd154cd..fa3ded4c0cd9f583dd19a0baf6ffa4d5584118d2 100644 --- a/src/objective-c/tests/RxLibraryUnitTests.m +++ b/src/objective-c/tests/RxLibraryUnitTests.m @@ -23,6 +23,8 @@ #import <RxLibrary/GRXWriteable.h> #import <RxLibrary/GRXWriter.h> +#define TEST_TIMEOUT 1 + // A mock of a GRXSingleValueHandler block that can be queried for how many times it was called and // what were the last values passed to it. // @@ -140,26 +142,38 @@ #pragma mark BufferedPipe - (void)testBufferedPipePropagatesValue { + __weak XCTestExpectation *expectation = [self expectationWithDescription:@"Response received"]; // Given: CapturingSingleValueHandler *handler = [CapturingSingleValueHandler handler]; - id<GRXWriteable> writeable = [GRXWriteable writeableWithSingleHandler:handler.block]; + id<GRXWriteable> writeable = [GRXWriteable writeableWithSingleHandler:^(id value, NSError *errorOrNil) { + handler.block(value, errorOrNil); + [expectation fulfill]; + }]; + id anyValue = @7; // If: GRXBufferedPipe *pipe = [GRXBufferedPipe pipe]; [pipe startWithWriteable:writeable]; [pipe writeValue:anyValue]; + [pipe writesFinishedWithError:nil]; // Then: + [self waitForExpectationsWithTimeout:TEST_TIMEOUT handler:nil]; XCTAssertEqual(handler.timesCalled, 1); XCTAssertEqualObjects(handler.value, anyValue); XCTAssertEqualObjects(handler.errorOrNil, nil); + } - (void)testBufferedPipePropagatesError { + __weak XCTestExpectation *expectation = [self expectationWithDescription:@"Response received"]; // Given: CapturingSingleValueHandler *handler = [CapturingSingleValueHandler handler]; - id<GRXWriteable> writeable = [GRXWriteable writeableWithSingleHandler:handler.block]; + id<GRXWriteable> writeable = [GRXWriteable writeableWithSingleHandler:^(id value, NSError *errorOrNil) { + handler.block(value, errorOrNil); + [expectation fulfill]; + }]; NSError *anyError = [NSError errorWithDomain:@"domain" code:7 userInfo:nil]; // If: @@ -168,15 +182,20 @@ [pipe writesFinishedWithError:anyError]; // Then: + [self waitForExpectationsWithTimeout:TEST_TIMEOUT handler:nil]; XCTAssertEqual(handler.timesCalled, 1); XCTAssertEqualObjects(handler.value, nil); XCTAssertEqualObjects(handler.errorOrNil, anyError); } - (void)testBufferedPipeFinishWriteWhilePaused { + __weak XCTestExpectation *expectation = [self expectationWithDescription:@"Response received"]; // Given: CapturingSingleValueHandler *handler = [CapturingSingleValueHandler handler]; - id<GRXWriteable> writeable = [GRXWriteable writeableWithSingleHandler:handler.block]; + id<GRXWriteable> writeable = [GRXWriteable writeableWithSingleHandler:^(id value, NSError *errorOrNil) { + handler.block(value, errorOrNil); + [expectation fulfill]; + }]; id anyValue = @7; // If: @@ -188,6 +207,7 @@ [pipe startWithWriteable:writeable]; // Then: + [self waitForExpectationsWithTimeout:TEST_TIMEOUT handler:nil]; XCTAssertEqual(handler.timesCalled, 1); XCTAssertEqualObjects(handler.value, anyValue); XCTAssertEqualObjects(handler.errorOrNil, nil);