Skip to content
Snippets Groups Projects
Commit 84e65e1f authored by Jan Tattermusch's avatar Jan Tattermusch Committed by GitHub
Browse files

Merge pull request #8713 from jtattermusch/csharp_cleanup_profiling

Cleanup C# profiling code
parents 76f90c39 b35dfa83
No related branches found
No related tags found
No related merge requests found
...@@ -388,35 +388,29 @@ namespace Grpc.Core.Internal ...@@ -388,35 +388,29 @@ namespace Grpc.Core.Internal
private void Initialize(CompletionQueueSafeHandle cq) private void Initialize(CompletionQueueSafeHandle cq)
{ {
using (Profilers.ForCurrentThread().NewScope("AsyncCall.Initialize")) var call = CreateNativeCall(cq);
{
var call = CreateNativeCall(cq);
details.Channel.AddCallReference(this); details.Channel.AddCallReference(this);
InitializeInternal(call); InitializeInternal(call);
RegisterCancellationCallback(); RegisterCancellationCallback();
}
} }
private INativeCall CreateNativeCall(CompletionQueueSafeHandle cq) private INativeCall CreateNativeCall(CompletionQueueSafeHandle cq)
{ {
using (Profilers.ForCurrentThread().NewScope("AsyncCall.CreateNativeCall")) if (injectedNativeCall != null)
{ {
if (injectedNativeCall != null) return injectedNativeCall; // allows injecting a mock INativeCall in tests.
{ }
return injectedNativeCall; // allows injecting a mock INativeCall in tests.
}
var parentCall = details.Options.PropagationToken != null ? details.Options.PropagationToken.ParentCall : CallSafeHandle.NullInstance; var parentCall = details.Options.PropagationToken != null ? details.Options.PropagationToken.ParentCall : CallSafeHandle.NullInstance;
var credentials = details.Options.Credentials; var credentials = details.Options.Credentials;
using (var nativeCredentials = credentials != null ? credentials.ToNativeCredentials() : null) using (var nativeCredentials = credentials != null ? credentials.ToNativeCredentials() : null)
{ {
var result = details.Channel.Handle.CreateCall( var result = details.Channel.Handle.CreateCall(
parentCall, ContextPropagationToken.DefaultMask, cq, parentCall, ContextPropagationToken.DefaultMask, cq,
details.Method, details.Host, Timespec.FromDateTime(details.Options.Deadline.Value), nativeCredentials); details.Method, details.Host, Timespec.FromDateTime(details.Options.Deadline.Value), nativeCredentials);
return result; return result;
}
} }
} }
...@@ -456,47 +450,44 @@ namespace Grpc.Core.Internal ...@@ -456,47 +450,44 @@ namespace Grpc.Core.Internal
// NOTE: because this event is a result of batch containing GRPC_OP_RECV_STATUS_ON_CLIENT, // NOTE: because this event is a result of batch containing GRPC_OP_RECV_STATUS_ON_CLIENT,
// success will be always set to true. // success will be always set to true.
using (Profilers.ForCurrentThread().NewScope("AsyncCall.HandleUnaryResponse")) TaskCompletionSource<object> delayedStreamingWriteTcs = null;
TResponse msg = default(TResponse);
var deserializeException = TryDeserialize(receivedMessage, out msg);
lock (myLock)
{ {
TaskCompletionSource<object> delayedStreamingWriteTcs = null; finished = true;
TResponse msg = default(TResponse);
var deserializeException = TryDeserialize(receivedMessage, out msg);
lock (myLock) if (deserializeException != null && receivedStatus.Status.StatusCode == StatusCode.OK)
{ {
finished = true; receivedStatus = new ClientSideStatus(DeserializeResponseFailureStatus, receivedStatus.Trailers);
if (deserializeException != null && receivedStatus.Status.StatusCode == StatusCode.OK)
{
receivedStatus = new ClientSideStatus(DeserializeResponseFailureStatus, receivedStatus.Trailers);
}
finishedStatus = receivedStatus;
if (isStreamingWriteCompletionDelayed)
{
delayedStreamingWriteTcs = streamingWriteTcs;
streamingWriteTcs = null;
}
ReleaseResourcesIfPossible();
} }
finishedStatus = receivedStatus;
responseHeadersTcs.SetResult(responseHeaders); if (isStreamingWriteCompletionDelayed)
if (delayedStreamingWriteTcs != null)
{ {
delayedStreamingWriteTcs.SetException(GetRpcExceptionClientOnly()); delayedStreamingWriteTcs = streamingWriteTcs;
streamingWriteTcs = null;
} }
var status = receivedStatus.Status; ReleaseResourcesIfPossible();
if (status.StatusCode != StatusCode.OK) }
{
unaryResponseTcs.SetException(new RpcException(status)); responseHeadersTcs.SetResult(responseHeaders);
return;
}
unaryResponseTcs.SetResult(msg); if (delayedStreamingWriteTcs != null)
{
delayedStreamingWriteTcs.SetException(GetRpcExceptionClientOnly());
}
var status = receivedStatus.Status;
if (status.StatusCode != StatusCode.OK)
{
unaryResponseTcs.SetException(new RpcException(status));
return;
} }
unaryResponseTcs.SetResult(msg);
} }
/// <summary> /// <summary>
......
...@@ -181,19 +181,16 @@ namespace Grpc.Core.Internal ...@@ -181,19 +181,16 @@ namespace Grpc.Core.Internal
/// </summary> /// </summary>
protected bool ReleaseResourcesIfPossible() protected bool ReleaseResourcesIfPossible()
{ {
using (Profilers.ForCurrentThread().NewScope("AsyncCallBase.ReleaseResourcesIfPossible")) if (!disposed && call != null)
{ {
if (!disposed && call != null) bool noMoreSendCompletions = streamingWriteTcs == null && (halfcloseRequested || cancelRequested || finished);
if (noMoreSendCompletions && readingDone && finished)
{ {
bool noMoreSendCompletions = streamingWriteTcs == null && (halfcloseRequested || cancelRequested || finished); ReleaseResources();
if (noMoreSendCompletions && readingDone && finished) return true;
{
ReleaseResources();
return true;
}
} }
return false;
} }
return false;
} }
protected abstract bool IsClient protected abstract bool IsClient
...@@ -229,28 +226,20 @@ namespace Grpc.Core.Internal ...@@ -229,28 +226,20 @@ namespace Grpc.Core.Internal
protected byte[] UnsafeSerialize(TWrite msg) protected byte[] UnsafeSerialize(TWrite msg)
{ {
using (Profilers.ForCurrentThread().NewScope("AsyncCallBase.UnsafeSerialize")) return serializer(msg);
{
return serializer(msg);
}
} }
protected Exception TryDeserialize(byte[] payload, out TRead msg) protected Exception TryDeserialize(byte[] payload, out TRead msg)
{ {
using (Profilers.ForCurrentThread().NewScope("AsyncCallBase.TryDeserialize")) try
{ {
try msg = deserializer(payload);
{ return null;
}
msg = deserializer(payload); catch (Exception e)
return null; {
msg = default(TRead);
} return e;
catch (Exception e)
{
msg = default(TRead);
return e;
}
} }
} }
......
...@@ -76,11 +76,8 @@ namespace Grpc.Core.Internal ...@@ -76,11 +76,8 @@ namespace Grpc.Core.Internal
public void StartUnary(BatchContextSafeHandle ctx, byte[] payload, MetadataArraySafeHandle metadataArray, WriteFlags writeFlags) public void StartUnary(BatchContextSafeHandle ctx, byte[] payload, MetadataArraySafeHandle metadataArray, WriteFlags writeFlags)
{ {
using (Profilers.ForCurrentThread().NewScope("CallSafeHandle.StartUnary")) Native.grpcsharp_call_start_unary(this, ctx, payload, new UIntPtr((ulong)payload.Length), metadataArray, writeFlags)
{ .CheckOk();
Native.grpcsharp_call_start_unary(this, ctx, payload, new UIntPtr((ulong)payload.Length), metadataArray, writeFlags)
.CheckOk();
}
} }
public void StartClientStreaming(UnaryResponseClientHandler callback, MetadataArraySafeHandle metadataArray) public void StartClientStreaming(UnaryResponseClientHandler callback, MetadataArraySafeHandle metadataArray)
......
...@@ -65,16 +65,13 @@ namespace Grpc.Core.Internal ...@@ -65,16 +65,13 @@ namespace Grpc.Core.Internal
public CallSafeHandle CreateCall(CallSafeHandle parentCall, ContextPropagationFlags propagationMask, CompletionQueueSafeHandle cq, string method, string host, Timespec deadline, CallCredentialsSafeHandle credentials) public CallSafeHandle CreateCall(CallSafeHandle parentCall, ContextPropagationFlags propagationMask, CompletionQueueSafeHandle cq, string method, string host, Timespec deadline, CallCredentialsSafeHandle credentials)
{ {
using (Profilers.ForCurrentThread().NewScope("ChannelSafeHandle.CreateCall")) var result = Native.grpcsharp_channel_create_call(this, parentCall, propagationMask, cq, method, host, deadline);
if (credentials != null)
{ {
var result = Native.grpcsharp_channel_create_call(this, parentCall, propagationMask, cq, method, host, deadline); result.SetCredentials(credentials);
if (credentials != null)
{
result.SetCredentials(credentials);
}
result.Initialize(cq);
return result;
} }
result.Initialize(cq);
return result;
} }
public ChannelState CheckConnectivityState(bool tryToConnect) public ChannelState CheckConnectivityState(bool tryToConnect)
......
...@@ -70,10 +70,7 @@ namespace Grpc.Core.Internal ...@@ -70,10 +70,7 @@ namespace Grpc.Core.Internal
public CompletionQueueEvent Pluck(IntPtr tag) public CompletionQueueEvent Pluck(IntPtr tag)
{ {
using (Profilers.ForCurrentThread().NewScope("CompletionQueueSafeHandle.Pluck")) return Native.grpcsharp_completion_queue_pluck(this, tag);
{
return Native.grpcsharp_completion_queue_pluck(this, tag);
}
} }
/// <summary> /// <summary>
......
...@@ -37,6 +37,7 @@ using System.Linq; ...@@ -37,6 +37,7 @@ using System.Linq;
using System.Threading; using System.Threading;
using System.Threading.Tasks; using System.Threading.Tasks;
using Grpc.Core.Logging; using Grpc.Core.Logging;
using Grpc.Core.Profiling;
using Grpc.Core.Utils; using Grpc.Core.Utils;
namespace Grpc.Core.Internal namespace Grpc.Core.Internal
...@@ -54,6 +55,8 @@ namespace Grpc.Core.Internal ...@@ -54,6 +55,8 @@ namespace Grpc.Core.Internal
readonly int poolSize; readonly int poolSize;
readonly int completionQueueCount; readonly int completionQueueCount;
readonly List<BasicProfiler> threadProfilers = new List<BasicProfiler>(); // profilers assigned to threadpool threads
bool stopRequested; bool stopRequested;
IReadOnlyCollection<CompletionQueueSafeHandle> completionQueues; IReadOnlyCollection<CompletionQueueSafeHandle> completionQueues;
...@@ -82,7 +85,8 @@ namespace Grpc.Core.Internal ...@@ -82,7 +85,8 @@ namespace Grpc.Core.Internal
for (int i = 0; i < poolSize; i++) for (int i = 0; i < poolSize; i++)
{ {
threads.Add(CreateAndStartThread(i)); var optionalProfiler = i < threadProfilers.Count ? threadProfilers[i] : null;
threads.Add(CreateAndStartThread(i, optionalProfiler));
} }
} }
} }
...@@ -111,6 +115,11 @@ namespace Grpc.Core.Internal ...@@ -111,6 +115,11 @@ namespace Grpc.Core.Internal
{ {
cq.Dispose(); cq.Dispose();
} }
for (int i = 0; i < threadProfilers.Count; i++)
{
threadProfilers[i].Dump(string.Format("grpc_trace_thread_{0}.txt", i));
}
}); });
} }
...@@ -137,12 +146,12 @@ namespace Grpc.Core.Internal ...@@ -137,12 +146,12 @@ namespace Grpc.Core.Internal
} }
} }
private Thread CreateAndStartThread(int threadIndex) private Thread CreateAndStartThread(int threadIndex, IProfiler optionalProfiler)
{ {
var cqIndex = threadIndex % completionQueues.Count; var cqIndex = threadIndex % completionQueues.Count;
var cq = completionQueues.ElementAt(cqIndex); var cq = completionQueues.ElementAt(cqIndex);
var thread = new Thread(new ThreadStart(() => RunHandlerLoop(cq))); var thread = new Thread(new ThreadStart(() => RunHandlerLoop(cq, optionalProfiler)));
thread.IsBackground = true; thread.IsBackground = true;
thread.Name = string.Format("grpc {0} (cq {1})", threadIndex, cqIndex); thread.Name = string.Format("grpc {0} (cq {1})", threadIndex, cqIndex);
thread.Start(); thread.Start();
...@@ -153,8 +162,13 @@ namespace Grpc.Core.Internal ...@@ -153,8 +162,13 @@ namespace Grpc.Core.Internal
/// <summary> /// <summary>
/// Body of the polling thread. /// Body of the polling thread.
/// </summary> /// </summary>
private void RunHandlerLoop(CompletionQueueSafeHandle cq) private void RunHandlerLoop(CompletionQueueSafeHandle cq, IProfiler optionalProfiler)
{ {
if (optionalProfiler != null)
{
Profilers.SetForCurrentThread(optionalProfiler);
}
CompletionQueueEvent ev; CompletionQueueEvent ev;
do do
{ {
......
...@@ -48,22 +48,19 @@ namespace Grpc.Core.Internal ...@@ -48,22 +48,19 @@ namespace Grpc.Core.Internal
public static MetadataArraySafeHandle Create(Metadata metadata) public static MetadataArraySafeHandle Create(Metadata metadata)
{ {
using (Profilers.ForCurrentThread().NewScope("MetadataArraySafeHandle.Create")) if (metadata.Count == 0)
{ {
if (metadata.Count == 0) return new MetadataArraySafeHandle();
{ }
return new MetadataArraySafeHandle();
}
// TODO(jtattermusch): we might wanna check that the metadata is readonly // TODO(jtattermusch): we might wanna check that the metadata is readonly
var metadataArray = Native.grpcsharp_metadata_array_create(new UIntPtr((ulong)metadata.Count)); var metadataArray = Native.grpcsharp_metadata_array_create(new UIntPtr((ulong)metadata.Count));
for (int i = 0; i < metadata.Count; i++) for (int i = 0; i < metadata.Count; i++)
{ {
var valueBytes = metadata[i].GetSerializedValueUnsafe(); var valueBytes = metadata[i].GetSerializedValueUnsafe();
Native.grpcsharp_metadata_array_add(metadataArray, metadata[i].Key, valueBytes, new UIntPtr((ulong)valueBytes.Length)); Native.grpcsharp_metadata_array_add(metadataArray, metadata[i].Key, valueBytes, new UIntPtr((ulong)valueBytes.Length));
}
return metadataArray;
} }
return metadataArray;
} }
/// <summary> /// <summary>
......
...@@ -80,7 +80,7 @@ namespace Grpc.Core.Profiling ...@@ -80,7 +80,7 @@ namespace Grpc.Core.Profiling
ProfilerEntry[] entries; ProfilerEntry[] entries;
int count; int count;
public BasicProfiler() : this(1024*1024) public BasicProfiler() : this(20*1024*1024)
{ {
} }
......
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment