diff --git a/src/compiler/objective_c_plugin.cc b/src/compiler/objective_c_plugin.cc index 8de0997ebeae33f503c725219c1690570bab4c3c..5178115e44c67995241d93d5289361b59a31aff3 100644 --- a/src/compiler/objective_c_plugin.cc +++ b/src/compiler/objective_c_plugin.cc @@ -68,6 +68,7 @@ class ObjectiveCGrpcGenerator : public grpc::protobuf::compiler::CodeGenerator { ::grpc::string imports = ::grpc::string("#import \"") + file_name + ".pbobjc.h\"\n\n" "#import <ProtoRPC/ProtoService.h>\n" + "#import <ProtoRPC/ProtoRPC.h>\n" "#import <RxLibrary/GRXWriteable.h>\n" "#import <RxLibrary/GRXWriter.h>\n"; diff --git a/src/objective-c/GRPCClient/GRPCCall.h b/src/objective-c/GRPCClient/GRPCCall.h index 7645bb1d34a99d2c6eca10f8ac71e9622f896414..5e9324c445627b2f36658cf3980d8e5590ae9e82 100644 --- a/src/objective-c/GRPCClient/GRPCCall.h +++ b/src/objective-c/GRPCClient/GRPCCall.h @@ -253,6 +253,13 @@ extern id const kGRPCTrailersKey; */ + (void)setCallSafety:(GRPCCallSafety)callSafety host:(NSString *)host path:(NSString *)path; +/** + * Set the dispatch queue to be used for callbacks. + * + * This configuration is only effective before the call starts. + */ +- (void)setResponseDispatchQueue:(dispatch_queue_t)queue; + // TODO(jcanizales): Let specify a deadline. As a category of GRXWriter? @end diff --git a/src/objective-c/GRPCClient/GRPCCall.m b/src/objective-c/GRPCClient/GRPCCall.m index 051138ea4da3ec7235e1996aea99cad81460923f..f9d13fea578b7a5124e0ec27d280b9bfc088c182 100644 --- a/src/objective-c/GRPCClient/GRPCCall.m +++ b/src/objective-c/GRPCClient/GRPCCall.m @@ -113,6 +113,10 @@ static NSMutableDictionary *callFlags; // the SendClose op is added. BOOL _unaryCall; NSMutableArray *_unaryOpBatch; + + // The dispatch queue to be used for enqueuing responses to user. Defaulted to the main dispatch + // queue + dispatch_queue_t _responseQueue; } @synthesize state = _state; @@ -175,10 +179,19 @@ static NSMutableDictionary *callFlags; _unaryCall = YES; _unaryOpBatch = [NSMutableArray arrayWithCapacity:kMaxClientBatch]; } + + _responseQueue = dispatch_get_main_queue(); } return self; } +- (void)setResponseDispatchQueue:(dispatch_queue_t)queue { + if (_state != GRXWriterStateNotStarted) { + return; + } + _responseQueue = queue; +} + #pragma mark Finish - (void)finishWithError:(NSError *)errorOrNil { @@ -424,7 +437,8 @@ static NSMutableDictionary *callFlags; // that the life of the instance is determined by this retain cycle. _retainSelf = self; - _responseWriteable = [[GRXConcurrentWriteable alloc] initWithWriteable:writeable]; + _responseWriteable = [[GRXConcurrentWriteable alloc] initWithWriteable:writeable + dispatchQueue:_responseQueue]; _wrappedCall = [[GRPCWrappedCall alloc] initWithHost:_host path:_path]; NSAssert(_wrappedCall, @"Error allocating RPC objects. Low memory?"); diff --git a/src/objective-c/RxLibrary/GRXConcurrentWriteable.h b/src/objective-c/RxLibrary/GRXConcurrentWriteable.h index b2775f98b56e5c19039d4eeb075e6dbf7b0a0562..07004f6d4dc66e7eb0fb4dc7c6d5c6479524b262 100644 --- a/src/objective-c/RxLibrary/GRXConcurrentWriteable.h +++ b/src/objective-c/RxLibrary/GRXConcurrentWriteable.h @@ -53,7 +53,9 @@ * The GRXWriteable instance is retained until writesFinishedWithError: is sent to it, and released * after that. */ -- (instancetype)initWithWriteable:(id<GRXWriteable>)writeable NS_DESIGNATED_INITIALIZER; +- (instancetype)initWithWriteable:(id<GRXWriteable>)writeable + dispatchQueue:(dispatch_queue_t)queue NS_DESIGNATED_INITIALIZER; +- (instancetype)initWithWriteable:(id<GRXWriteable>)writeable; /** * Enqueues writeValue: to be sent to the writeable in the main thread. diff --git a/src/objective-c/RxLibrary/GRXConcurrentWriteable.m b/src/objective-c/RxLibrary/GRXConcurrentWriteable.m index 08bd079aea59121f50e0040f9ce0350eb42c5485..88aa7a7282fbf7b94eba5e46dc2a8bd7f11cbdaa 100644 --- a/src/objective-c/RxLibrary/GRXConcurrentWriteable.m +++ b/src/objective-c/RxLibrary/GRXConcurrentWriteable.m @@ -51,14 +51,20 @@ } // Designated initializer -- (instancetype)initWithWriteable:(id<GRXWriteable>)writeable { +- (instancetype)initWithWriteable:(id<GRXWriteable>)writeable + dispatchQueue:(dispatch_queue_t)queue { if (self = [super init]) { - _writeableQueue = dispatch_get_main_queue(); + _writeableQueue = queue; _writeable = writeable; } return self; } +- (instancetype)initWithWriteable:(id<GRXWriteable>)writeable { + return [self initWithWriteable:writeable + dispatchQueue:dispatch_get_main_queue()]; +} + - (void)enqueueValue:(id)value completionHandler:(void (^)())handler { dispatch_async(_writeableQueue, ^{ // We're racing a possible cancellation performed by another thread. To turn all already- diff --git a/src/objective-c/tests/GRPCClientTests.m b/src/objective-c/tests/GRPCClientTests.m index 76c15003f603a5b97c5f261db5f645c09cc7e335..e36f5c3ee937e81db6148079988b69f8c91f858a 100644 --- a/src/objective-c/tests/GRPCClientTests.m +++ b/src/objective-c/tests/GRPCClientTests.m @@ -353,4 +353,59 @@ static GRPCProtoMethod *kUnaryCallMethod; [self waitForExpectationsWithTimeout:TEST_TIMEOUT handler:nil]; } +- (void)testAlternateDispatchQueue { + const int32_t kPayloadSize = 100; + RMTSimpleRequest *request = [RMTSimpleRequest message]; + request.responseSize = kPayloadSize; + + __weak XCTestExpectation *expectation1 = [self expectationWithDescription:@"AlternateDispatchQueue1"]; + + // Use default (main) dispatch queue + NSString *main_queue_label = [NSString stringWithUTF8String:dispatch_queue_get_label(dispatch_get_main_queue())]; + + GRXWriter *requestsWriter1 = [GRXWriter writerWithValue:[request data]]; + + GRPCCall *call1 = [[GRPCCall alloc] initWithHost:kHostAddress + path:kUnaryCallMethod.HTTPPath + requestsWriter:requestsWriter1]; + + id<GRXWriteable> responsesWriteable1 = [[GRXWriteable alloc] initWithValueHandler:^(NSData *value) { + NSString *label = [NSString stringWithUTF8String:dispatch_queue_get_label(DISPATCH_CURRENT_QUEUE_LABEL)]; + XCTAssert([label isEqualToString:main_queue_label]); + + [expectation1 fulfill]; + } completionHandler:^(NSError *errorOrNil) { + }]; + + [call1 startWithWriteable:responsesWriteable1]; + + [self waitForExpectationsWithTimeout:TEST_TIMEOUT handler:nil]; + + // Use a custom queue + __weak XCTestExpectation *expectation2 = [self expectationWithDescription:@"AlternateDispatchQueue2"]; + + NSString *queue_label = @"test.queue1"; + dispatch_queue_t queue = dispatch_queue_create([queue_label UTF8String], DISPATCH_QUEUE_SERIAL); + + GRXWriter *requestsWriter2 = [GRXWriter writerWithValue:[request data]]; + + GRPCCall *call2 = [[GRPCCall alloc] initWithHost:kHostAddress + path:kUnaryCallMethod.HTTPPath + requestsWriter:requestsWriter2]; + + [call2 setResponseDispatchQueue:queue]; + + id<GRXWriteable> responsesWriteable2 = [[GRXWriteable alloc] initWithValueHandler:^(NSData *value) { + NSString *label = [NSString stringWithUTF8String:dispatch_queue_get_label(DISPATCH_CURRENT_QUEUE_LABEL)]; + XCTAssert([label isEqualToString:queue_label]); + + [expectation2 fulfill]; + } completionHandler:^(NSError *errorOrNil) { + }]; + + [call2 startWithWriteable:responsesWriteable2]; + + [self waitForExpectationsWithTimeout:TEST_TIMEOUT handler:nil]; +} + @end