From 97e294aadc251c0fac5b417e55f207cedb10561e Mon Sep 17 00:00:00 2001
From: Jan Tattermusch <jtattermusch@google.com>
Date: Thu, 23 Apr 2015 14:30:59 -0700
Subject: [PATCH] Removing some ugly hacks from C# server and improving the
 comments a bit

---
 .../Grpc.Core/Internal/CallSafeHandle.cs      |   3 +-
 .../Grpc.Core/Internal/ServerSafeHandle.cs    |  10 +-
 src/csharp/Grpc.Core/Server.cs                | 176 +++++++++---------
 .../Grpc.Examples.MathServer/MathServer.cs    |   2 +-
 .../Grpc.IntegrationTesting.csproj            |   4 +-
 5 files changed, 107 insertions(+), 88 deletions(-)

diff --git a/src/csharp/Grpc.Core/Internal/CallSafeHandle.cs b/src/csharp/Grpc.Core/Internal/CallSafeHandle.cs
index 14add60c72..c97a3bc2b1 100644
--- a/src/csharp/Grpc.Core/Internal/CallSafeHandle.cs
+++ b/src/csharp/Grpc.Core/Internal/CallSafeHandle.cs
@@ -33,6 +33,7 @@ using System;
 using System.Diagnostics;
 using System.Runtime.InteropServices;
 using Grpc.Core;
+using Grpc.Core.Utils;
 
 namespace Grpc.Core.Internal
 {
@@ -180,7 +181,7 @@ namespace Grpc.Core.Internal
 
         private static void AssertCallOk(GRPCCallError callError)
         {
-            Trace.Assert(callError == GRPCCallError.GRPC_CALL_OK, "Status not GRPC_CALL_OK");
+            Preconditions.CheckState(callError == GRPCCallError.GRPC_CALL_OK, "Status not GRPC_CALL_OK");
         }
 
         private static uint GetFlags(bool buffered)
diff --git a/src/csharp/Grpc.Core/Internal/ServerSafeHandle.cs b/src/csharp/Grpc.Core/Internal/ServerSafeHandle.cs
index a59da09822..8080643d8c 100644
--- a/src/csharp/Grpc.Core/Internal/ServerSafeHandle.cs
+++ b/src/csharp/Grpc.Core/Internal/ServerSafeHandle.cs
@@ -35,6 +35,7 @@ using System;
 using System.Collections.Concurrent;
 using System.Diagnostics;
 using System.Runtime.InteropServices;
+using Grpc.Core.Utils;
 
 namespace Grpc.Core.Internal
 {
@@ -105,9 +106,9 @@ namespace Grpc.Core.Internal
             grpcsharp_server_shutdown_and_notify_CALLBACK(this, callback);
         }
 
-        public GRPCCallError RequestCall(CompletionQueueSafeHandle cq, CompletionCallbackDelegate callback)
+        public void RequestCall(CompletionQueueSafeHandle cq, CompletionCallbackDelegate callback)
         {
-            return grpcsharp_server_request_call(this, cq, callback);
+            AssertCallOk(grpcsharp_server_request_call(this, cq, callback));
         }
 
         protected override bool ReleaseHandle()
@@ -115,5 +116,10 @@ namespace Grpc.Core.Internal
             grpcsharp_server_destroy(handle);
             return true;
         }
+
+        private static void AssertCallOk(GRPCCallError callError)
+        {
+            Preconditions.CheckState(callError == GRPCCallError.GRPC_CALL_OK, "Status not GRPC_CALL_OK");
+        }
     }
 }
diff --git a/src/csharp/Grpc.Core/Server.cs b/src/csharp/Grpc.Core/Server.cs
index f086fa8beb..e686cdddef 100644
--- a/src/csharp/Grpc.Core/Server.cs
+++ b/src/csharp/Grpc.Core/Server.cs
@@ -38,27 +38,29 @@ using System.Diagnostics;
 using System.Runtime.InteropServices;
 using System.Threading.Tasks;
 using Grpc.Core.Internal;
+using Grpc.Core.Utils;
 
 namespace Grpc.Core
 {
     /// <summary>
-    /// Server is implemented only to be able to do
-    /// in-process testing.
+    /// A gRPC server.
     /// </summary>
     public class Server
     {
-        // TODO: make sure the delegate doesn't get garbage collected while
+        // TODO(jtattermusch) : make sure the delegate doesn't get garbage collected while
         // native callbacks are in the completion queue.
         readonly ServerShutdownCallbackDelegate serverShutdownHandler;
         readonly CompletionCallbackDelegate newServerRpcHandler;
 
-        readonly BlockingCollection<NewRpcInfo> newRpcQueue = new BlockingCollection<NewRpcInfo>();
         readonly ServerSafeHandle handle;
+        readonly object myLock = new object();
 
         readonly Dictionary<string, IServerCallHandler> callHandlers = new Dictionary<string, IServerCallHandler>();
-
         readonly TaskCompletionSource<object> shutdownTcs = new TaskCompletionSource<object>();
 
+        bool startRequested;
+        bool shutdownRequested;
+
         public Server()
         {
             this.handle = ServerSafeHandle.NewServer(GetCompletionQueue(), IntPtr.Zero);
@@ -66,71 +68,81 @@ namespace Grpc.Core
             this.serverShutdownHandler = HandleServerShutdown;
         }
 
-        // only call this before Start()
+        /// <summary>
+        /// Adds a service definition to the server. This is how you register
+        /// handlers for a service with the server.
+        /// Only call this before Start().
+        /// </summary>
         public void AddServiceDefinition(ServerServiceDefinition serviceDefinition)
         {
-            foreach (var entry in serviceDefinition.CallHandlers)
+            lock (myLock)
             {
-                callHandlers.Add(entry.Key, entry.Value);
+                Preconditions.CheckState(!startRequested);
+                foreach (var entry in serviceDefinition.CallHandlers)
+                {
+                    callHandlers.Add(entry.Key, entry.Value);
+                }
             }
         }
 
-        // only call before Start()
+        /// <summary>
+        /// Add a non-secure port on which server should listen.
+        /// Only call this before Start().
+        /// </summary>
         public int AddListeningPort(string addr)
         {
-            return handle.AddListeningPort(addr);
-        }
-
-        // only call before Start()
-        public int AddListeningPort(string addr, ServerCredentials credentials)
-        {
-            using (var nativeCredentials = credentials.ToNativeCredentials())
+            lock (myLock)
             {
-                return handle.AddListeningPort(addr, nativeCredentials);
+                Preconditions.CheckState(!startRequested);
+                return handle.AddListeningPort(addr);
             }
         }
 
-        public void Start()
-        {
-            handle.Start();
-
-            // TODO: this basically means the server is single threaded....
-            StartHandlingRpcs();
-        }
-
         /// <summary>
-        /// Requests and handles single RPC call.
+        /// Add a secure port on which server should listen.
+        /// Only call this before Start().
         /// </summary>
-        internal void RunRpc()
+        public int AddListeningPort(string addr, ServerCredentials credentials)
         {
-            AllowOneRpc();
-
-            try
+            lock (myLock)
             {
-                var rpcInfo = newRpcQueue.Take();
-
-                // Console.WriteLine("Server received RPC " + rpcInfo.Method);
-
-                IServerCallHandler callHandler;
-                if (!callHandlers.TryGetValue(rpcInfo.Method, out callHandler))
+                Preconditions.CheckState(!startRequested);
+                using (var nativeCredentials = credentials.ToNativeCredentials())
                 {
-                    callHandler = new NoSuchMethodCallHandler();
+                    return handle.AddListeningPort(addr, nativeCredentials);
                 }
-                callHandler.StartCall(rpcInfo.Method, rpcInfo.Call, GetCompletionQueue());
             }
-            catch (Exception e)
+        }
+
+        /// <summary>
+        /// Starts the server.
+        /// </summary>
+        public void Start()
+        {
+            lock (myLock)
             {
-                Console.WriteLine("Exception while handling RPC: " + e);
+                Preconditions.CheckState(!startRequested);
+                startRequested = true;
+                
+                handle.Start();
+                AllowOneRpc();
             }
         }
 
         /// <summary>
         /// Requests server shutdown and when there are no more calls being serviced,
-        /// cleans up used resources.
+        /// cleans up used resources. The returned task finishes when shutdown procedure
+        /// is complete.
         /// </summary>
-        /// <returns>The async.</returns>
         public async Task ShutdownAsync()
         {
+            lock (myLock)
+            {
+                Preconditions.CheckState(startRequested);
+                Preconditions.CheckState(!shutdownRequested);
+                shutdownRequested = true;
+            }
+
             handle.ShutdownAndNotify(serverShutdownHandler);
             await shutdownTcs.Task;
             handle.Dispose();
@@ -152,19 +164,43 @@ namespace Grpc.Core
             handle.Dispose();
         }
 
-        private async Task StartHandlingRpcs()
+        /// <summary>
+        /// Allows one new RPC call to be received by server.
+        /// </summary>
+        private void AllowOneRpc()
         {
-            while (true)
+            lock (myLock)
             {
-                await Task.Factory.StartNew(RunRpc);
+                if (!shutdownRequested)
+                {
+                    handle.RequestCall(GetCompletionQueue(), newServerRpcHandler);
+                }
             }
         }
 
-        private void AllowOneRpc()
+        /// <summary>
+        /// Selects corresponding handler for given call and handles the call.
+        /// </summary>
+        private void InvokeCallHandler(CallSafeHandle call, string method)
         {
-            AssertCallOk(handle.RequestCall(GetCompletionQueue(), newServerRpcHandler));
+            try
+            {
+                IServerCallHandler callHandler;
+                if (!callHandlers.TryGetValue(method, out callHandler))
+                {
+                    callHandler = new NoSuchMethodCallHandler();
+                }
+                callHandler.StartCall(method, call, GetCompletionQueue());
+            }
+            catch (Exception e)
+            {
+                Console.WriteLine("Exception while handling RPC: " + e);
+            }
         }
 
+        /// <summary>
+        /// Handles the native callback.
+        /// </summary>
         private void HandleNewServerRpc(GRPCOpError error, IntPtr batchContextPtr)
         {
             try
@@ -176,13 +212,16 @@ namespace Grpc.Core
                     // TODO: handle error
                 }
 
-                var rpcInfo = new NewRpcInfo(ctx.GetServerRpcNewCall(), ctx.GetServerRpcNewMethod());
+                CallSafeHandle call = ctx.GetServerRpcNewCall();
+                string method = ctx.GetServerRpcNewMethod();
 
                 // after server shutdown, the callback returns with null call
-                if (!rpcInfo.Call.IsInvalid)
+                if (!call.IsInvalid)
                 {
-                    newRpcQueue.Add(rpcInfo);
+                    Task.Run(() => InvokeCallHandler(call, method));
                 }
+
+                AllowOneRpc();
             }
             catch (Exception e)
             {
@@ -190,6 +229,10 @@ namespace Grpc.Core
             }
         }
 
+        /// <summary>
+        /// Handles native callback.
+        /// </summary>
+        /// <param name="eventPtr"></param>
         private void HandleServerShutdown(IntPtr eventPtr)
         {
             try
@@ -202,42 +245,9 @@ namespace Grpc.Core
             }
         }
 
-        private static void AssertCallOk(GRPCCallError callError)
-        {
-            Trace.Assert(callError == GRPCCallError.GRPC_CALL_OK, "Status not GRPC_CALL_OK");
-        }
-
         private static CompletionQueueSafeHandle GetCompletionQueue()
         {
             return GrpcEnvironment.ThreadPool.CompletionQueue;
         }
-
-        private struct NewRpcInfo
-        {
-            private CallSafeHandle call;
-            private string method;
-
-            public NewRpcInfo(CallSafeHandle call, string method)
-            {
-                this.call = call;
-                this.method = method;
-            }
-
-            public CallSafeHandle Call
-            {
-                get
-                {
-                    return this.call;
-                }
-            }
-
-            public string Method
-            {
-                get
-                {
-                    return this.method;
-                }
-            }
-        }
     }
 }
diff --git a/src/csharp/Grpc.Examples.MathServer/MathServer.cs b/src/csharp/Grpc.Examples.MathServer/MathServer.cs
index 884a84d0a6..c26763e622 100644
--- a/src/csharp/Grpc.Examples.MathServer/MathServer.cs
+++ b/src/csharp/Grpc.Examples.MathServer/MathServer.cs
@@ -40,7 +40,7 @@ namespace math
     {
         public static void Main(string[] args)
         {
-            String host = "0.0.0.0";
+            string host = "0.0.0.0";
 
             GrpcEnvironment.Initialize();
 
diff --git a/src/csharp/Grpc.IntegrationTesting/Grpc.IntegrationTesting.csproj b/src/csharp/Grpc.IntegrationTesting/Grpc.IntegrationTesting.csproj
index 584bf1068d..6ae8041fb7 100644
--- a/src/csharp/Grpc.IntegrationTesting/Grpc.IntegrationTesting.csproj
+++ b/src/csharp/Grpc.IntegrationTesting/Grpc.IntegrationTesting.csproj
@@ -80,5 +80,7 @@
       <CopyToOutputDirectory>PreserveNewest</CopyToOutputDirectory>
     </None>
   </ItemGroup>
-  <ItemGroup />
+  <ItemGroup>
+    <Service Include="{82A7F48D-3B50-4B1E-B82E-3ADA8210C358}" />
+  </ItemGroup>
 </Project>
\ No newline at end of file
-- 
GitLab