diff --git a/src/csharp/Grpc.Core.Tests/Internal/AsyncCallTest.cs b/src/csharp/Grpc.Core.Tests/Internal/AsyncCallTest.cs
index 324b6825102a202f09244eca97d872adedbbeac3..96749cda14ac2f798e3414480bf61e5a824e63f4 100644
--- a/src/csharp/Grpc.Core.Tests/Internal/AsyncCallTest.cs
+++ b/src/csharp/Grpc.Core.Tests/Internal/AsyncCallTest.cs
@@ -76,8 +76,8 @@ namespace Grpc.Core.Internal.Tests
         public void AsyncUnary_StreamingOperationsNotAllowed()
         {
             asyncCall.UnaryCallAsync("request1");
-            Assert.Throws(typeof(InvalidOperationException),
-                () => asyncCall.StartReadMessage((x,y) => {}));
+            Assert.ThrowsAsync(typeof(InvalidOperationException),
+                async () => await asyncCall.ReadMessageAsync());
             Assert.Throws(typeof(InvalidOperationException),
                 () => asyncCall.StartSendMessage("abc", new WriteFlags(), (x,y) => {}));
         }
@@ -123,8 +123,8 @@ namespace Grpc.Core.Internal.Tests
         public void ClientStreaming_StreamingReadNotAllowed()
         {
             asyncCall.ClientStreamingCallAsync();
-            Assert.Throws(typeof(InvalidOperationException),
-                () => asyncCall.StartReadMessage((x,y) => {}));
+            Assert.ThrowsAsync(typeof(InvalidOperationException),
+                async () => await asyncCall.ReadMessageAsync());
         }
 
         [Test]
@@ -182,9 +182,6 @@ namespace Grpc.Core.Internal.Tests
             var responseStream = new ClientResponseStream<string, string>(asyncCall);
             var readTask = responseStream.MoveNext();
 
-            fakeCall.ReceivedResponseHeadersHandler(true, new Metadata());
-            Assert.AreEqual(0, asyncCall.ResponseHeadersAsync.Result.Count);
-
             // try alternative order of completions
             fakeCall.ReceivedStatusOnClientHandler(true, new ClientSideStatus(Status.DefaultSuccess, new Metadata()));
             fakeCall.ReceivedMessageHandler(true, null);
@@ -199,15 +196,35 @@ namespace Grpc.Core.Internal.Tests
             var responseStream = new ClientResponseStream<string, string>(asyncCall);
             var readTask = responseStream.MoveNext();
 
-            fakeCall.ReceivedResponseHeadersHandler(true, new Metadata());
-            Assert.AreEqual(0, asyncCall.ResponseHeadersAsync.Result.Count);
-
             fakeCall.ReceivedMessageHandler(false, null);  // after a failed read, we rely on C core to deliver appropriate status code.
             fakeCall.ReceivedStatusOnClientHandler(true, CreateClientSideStatus(StatusCode.Internal));
 
             AssertStreamingResponseError(asyncCall, fakeCall, readTask, StatusCode.Internal);
         }
 
+        [Test]
+        public void ServerStreaming_MoreResponses_Success()
+        {
+            asyncCall.StartServerStreamingCall("request1");
+            var responseStream = new ClientResponseStream<string, string>(asyncCall);
+
+            var readTask1 = responseStream.MoveNext();
+            fakeCall.ReceivedMessageHandler(true, CreateResponsePayload());
+            Assert.IsTrue(readTask1.Result);
+            Assert.AreEqual("response1", responseStream.Current);
+
+            var readTask2 = responseStream.MoveNext();
+            fakeCall.ReceivedMessageHandler(true, CreateResponsePayload());
+            Assert.IsTrue(readTask2.Result);
+            Assert.AreEqual("response1", responseStream.Current);
+
+            var readTask3 = responseStream.MoveNext();
+            fakeCall.ReceivedStatusOnClientHandler(true, new ClientSideStatus(Status.DefaultSuccess, new Metadata()));
+            fakeCall.ReceivedMessageHandler(true, null);
+
+            AssertStreamingResponseSuccess(asyncCall, fakeCall, readTask3);
+        }
+
         ClientSideStatus CreateClientSideStatus(StatusCode statusCode)
         {
             return new ClientSideStatus(new Status(statusCode, ""), new Metadata());
@@ -236,7 +253,6 @@ namespace Grpc.Core.Internal.Tests
 
             Assert.IsFalse(moveNextTask.Result);
             Assert.AreEqual(Status.DefaultSuccess, asyncCall.GetStatus());
-            Assert.AreEqual(0, asyncCall.ResponseHeadersAsync.Result.Count);
             Assert.AreEqual(0, asyncCall.GetTrailers().Count);
         }
 
@@ -259,7 +275,6 @@ namespace Grpc.Core.Internal.Tests
 
             var ex = Assert.ThrowsAsync<RpcException>(async () => await moveNextTask);
             Assert.AreEqual(expectedStatusCode, asyncCall.GetStatus().StatusCode);
-            Assert.AreEqual(0, asyncCall.ResponseHeadersAsync.Result.Count);
             Assert.AreEqual(0, asyncCall.GetTrailers().Count);
         }
 
diff --git a/src/csharp/Grpc.Core/Internal/AsyncCall.cs b/src/csharp/Grpc.Core/Internal/AsyncCall.cs
index 50ba617cdb1727524f6ed3727576cf6070a4697a..f522174bd0f9fe4748fd3b0fcb42e99ec1e39631 100644
--- a/src/csharp/Grpc.Core/Internal/AsyncCall.cs
+++ b/src/csharp/Grpc.Core/Internal/AsyncCall.cs
@@ -241,11 +241,10 @@ namespace Grpc.Core.Internal
 
         /// <summary>
         /// Receives a streaming response. Only one pending read action is allowed at any given time.
-        /// completionDelegate is called when the operation finishes.
         /// </summary>
-        public void StartReadMessage(AsyncCompletionDelegate<TResponse> completionDelegate)
+        public Task<TResponse> ReadMessageAsync()
         {
-            StartReadMessageInternal(completionDelegate);
+            return ReadMessageInternalAsync();
         }
 
         /// <summary>
diff --git a/src/csharp/Grpc.Core/Internal/AsyncCallBase.cs b/src/csharp/Grpc.Core/Internal/AsyncCallBase.cs
index 877b997aba094a88a5438afdd4b505a39105398d..abacfabadb6a7ba9930a8c7c25d0e4cbe46d034a 100644
--- a/src/csharp/Grpc.Core/Internal/AsyncCallBase.cs
+++ b/src/csharp/Grpc.Core/Internal/AsyncCallBase.cs
@@ -68,7 +68,7 @@ namespace Grpc.Core.Internal
         protected bool cancelRequested;
 
         protected AsyncCompletionDelegate<object> sendCompletionDelegate;  // Completion of a pending send or sendclose if not null.
-        protected AsyncCompletionDelegate<TRead> readCompletionDelegate;  // Completion of a pending send or sendclose if not null.
+        protected TaskCompletionSource<TRead> streamingReadTcs;  // Completion of a pending streaming read if not null.
 
         protected bool readingDone;  // True if last read (i.e. read with null payload) was already received.
         protected bool halfcloseRequested;  // True if send close have been initiated.
@@ -150,15 +150,25 @@ namespace Grpc.Core.Internal
         /// Initiates reading a message. Only one read operation can be active at a time.
         /// completionDelegate is invoked upon completion.
         /// </summary>
-        protected void StartReadMessageInternal(AsyncCompletionDelegate<TRead> completionDelegate)
+        protected Task<TRead> ReadMessageInternalAsync()
         {
             lock (myLock)
             {
-                GrpcPreconditions.CheckNotNull(completionDelegate, "Completion delegate cannot be null");
                 CheckReadingAllowed();
+                if (readingDone)
+                {
+                    // the last read that returns null or throws an exception is idempotent
+                    // and maintain its state.
+                    GrpcPreconditions.CheckState(streamingReadTcs != null, "Call does not support streaming reads.");
+                    return streamingReadTcs.Task;
+                }
+
+                GrpcPreconditions.CheckState(streamingReadTcs == null, "Only one read can be pending at a time");
+                GrpcPreconditions.CheckState(!disposed);
 
                 call.StartReceiveMessage(HandleReadFinished);
-                readCompletionDelegate = completionDelegate;
+                streamingReadTcs = new TaskCompletionSource<TRead>();
+                return streamingReadTcs.Task;
             }
         }
 
@@ -216,10 +226,6 @@ namespace Grpc.Core.Internal
         protected virtual void CheckReadingAllowed()
         {
             GrpcPreconditions.CheckState(started);
-            GrpcPreconditions.CheckState(!disposed);
-
-            GrpcPreconditions.CheckState(!readingDone, "Stream has already been closed.");
-            GrpcPreconditions.CheckState(readCompletionDelegate == null, "Only one read can be pending at a time");
         }
 
         protected void CheckNotCancelled()
@@ -353,12 +359,10 @@ namespace Grpc.Core.Internal
             TRead msg = default(TRead);
             var deserializeException = (success && receivedMessage != null) ? TryDeserialize(receivedMessage, out msg) : null;
 
-            AsyncCompletionDelegate<TRead> origCompletionDelegate = null;
+            TaskCompletionSource<TRead> origTcs = null;
             lock (myLock)
             {
-                origCompletionDelegate = readCompletionDelegate;
-                readCompletionDelegate = null;
-
+                origTcs = streamingReadTcs;
                 if (receivedMessage == null)
                 {
                     // This was the last read.
@@ -368,18 +372,25 @@ namespace Grpc.Core.Internal
                 if (deserializeException != null && IsClient)
                 {
                     readingDone = true;
+
+                    // TODO(jtattermusch): it might be too late to set the status
                     CancelWithStatus(DeserializeResponseFailureStatus);
                 }
 
+                if (!readingDone)
+                {
+                    streamingReadTcs = null;
+                }
+
                 ReleaseResourcesIfPossible();
             }
 
             if (deserializeException != null && !IsClient)
             {
-                FireCompletion(origCompletionDelegate, default(TRead), new IOException("Failed to deserialize request message.", deserializeException));
+                origTcs.SetException(new IOException("Failed to deserialize request message.", deserializeException));
                 return;
             }
-            FireCompletion(origCompletionDelegate, msg, null);
+            origTcs.SetResult(msg);
         }
     }
 }
diff --git a/src/csharp/Grpc.Core/Internal/AsyncCallServer.cs b/src/csharp/Grpc.Core/Internal/AsyncCallServer.cs
index bea2b3660c3293469f7e789544ed7266305161be..cce480b2c4a0173642910302e03ceea3e6d7300d 100644
--- a/src/csharp/Grpc.Core/Internal/AsyncCallServer.cs
+++ b/src/csharp/Grpc.Core/Internal/AsyncCallServer.cs
@@ -91,11 +91,10 @@ namespace Grpc.Core.Internal
 
         /// <summary>
         /// Receives a streaming request. Only one pending read action is allowed at any given time.
-        /// completionDelegate is called when the operation finishes.
         /// </summary>
-        public void StartReadMessage(AsyncCompletionDelegate<TRequest> completionDelegate)
+        public Task<TRequest> ReadMessageAsync()
         {
-            StartReadMessageInternal(completionDelegate);
+            return ReadMessageInternalAsync();
         }
 
         /// <summary>
diff --git a/src/csharp/Grpc.Core/Internal/ClientResponseStream.cs b/src/csharp/Grpc.Core/Internal/ClientResponseStream.cs
index d6e34a0f04431602230284f2185d002ed9d65db4..ad9423ff58c9965d31c62f299beb321a83340e01 100644
--- a/src/csharp/Grpc.Core/Internal/ClientResponseStream.cs
+++ b/src/csharp/Grpc.Core/Internal/ClientResponseStream.cs
@@ -68,9 +68,7 @@ namespace Grpc.Core.Internal
             {
                 throw new InvalidOperationException("Cancellation of individual reads is not supported.");
             }
-            var taskSource = new AsyncCompletionTaskSource<TResponse>();
-            call.StartReadMessage(taskSource.CompletionDelegate);
-            var result = await taskSource.Task.ConfigureAwait(false);
+            var result = await call.ReadMessageAsync().ConfigureAwait(false);
             this.current = result;
 
             if (result == null)
diff --git a/src/csharp/Grpc.Core/Internal/ServerRequestStream.cs b/src/csharp/Grpc.Core/Internal/ServerRequestStream.cs
index e7be82c31857e44e3830fc52a647cfbb537fd50c..d76030d1addde87781d8e9183637eef47d383f12 100644
--- a/src/csharp/Grpc.Core/Internal/ServerRequestStream.cs
+++ b/src/csharp/Grpc.Core/Internal/ServerRequestStream.cs
@@ -68,9 +68,7 @@ namespace Grpc.Core.Internal
             {
                 throw new InvalidOperationException("Cancellation of individual reads is not supported.");
             }
-            var taskSource = new AsyncCompletionTaskSource<TRequest>();
-            call.StartReadMessage(taskSource.CompletionDelegate);
-            var result = await taskSource.Task.ConfigureAwait(false);
+            var result = await call.ReadMessageAsync().ConfigureAwait(false);
             this.current = result;
             return result != null;
         }