Skip to content
GitLab
Explore
Sign in
Primary navigation
Search or go to…
Project
G
Grpc
Manage
Activity
Members
Labels
Plan
Issues
Issue boards
Milestones
Wiki
Code
Merge requests
Repository
Branches
Commits
Tags
Repository graph
Compare revisions
Snippets
Build
Pipelines
Jobs
Pipeline schedules
Artifacts
Deploy
Releases
Container Registry
Model registry
Operate
Environments
Monitor
Incidents
Service Desk
Analyze
Value stream analytics
Contributor analytics
CI/CD analytics
Repository analytics
Model experiments
Help
Help
Support
GitLab documentation
Compare GitLab plans
Community forum
Contribute to GitLab
Provide feedback
Keyboard shortcuts
?
Snippets
Groups
Projects
Show more breadcrumbs
tci-gateway-module
Grpc
Commits
5c004c68
Commit
5c004c68
authored
10 years ago
by
Craig Tiller
Browse files
Options
Downloads
Patches
Plain Diff
Driver changes
WIP - things compile again after a broad set of changes preparing for the driver code.
parent
b0a32fc7
No related branches found
Branches containing commit
No related tags found
Tags containing commit
No related merge requests found
Changes
3
Hide whitespace changes
Inline
Side-by-side
Showing
3 changed files
test/cpp/qps/client.cc
+80
-97
80 additions, 97 deletions
test/cpp/qps/client.cc
test/cpp/qps/qpstest.proto
+34
-12
34 additions, 12 deletions
test/cpp/qps/qpstest.proto
test/cpp/qps/server.cc
+64
-18
64 additions, 18 deletions
test/cpp/qps/server.cc
with
178 additions
and
127 deletions
test/cpp/qps/client.cc
+
80
−
97
View file @
5c004c68
...
@@ -33,48 +33,36 @@
...
@@ -33,48 +33,36 @@
#include
<cassert>
#include
<cassert>
#include
<memory>
#include
<memory>
#include
<mutex>
#include
<string>
#include
<string>
#include
<thread>
#include
<thread>
#include
<vector>
#include
<vector>
#include
<sstream>
#include
<sstream>
#include
<sys/signal.h>
#include
<grpc/grpc.h>
#include
<grpc/grpc.h>
#include
<grpc/support/alloc.h>
#include
<grpc/support/histogram.h>
#include
<grpc/support/histogram.h>
#include
<grpc/support/log.h>
#include
<grpc/support/log.h>
#include
<grpc/support/host_port.h>
#include
<gflags/gflags.h>
#include
<gflags/gflags.h>
#include
<grpc++/client_context.h>
#include
<grpc++/client_context.h>
#include
<grpc++/status.h>
#include
<grpc++/status.h>
#include
<grpc++/server.h>
#include
<grpc++/server_builder.h>
#include
"test/core/util/grpc_profiler.h"
#include
"test/core/util/grpc_profiler.h"
#include
"test/cpp/util/create_test_channel.h"
#include
"test/cpp/util/create_test_channel.h"
#include
"test/cpp/qps/qpstest.pb.h"
#include
"test/cpp/qps/qpstest.pb.h"
DEFINE_bool
(
enable_ssl
,
false
,
"Whether to use ssl/tls."
);
DEFINE_int32
(
driver_port
,
0
,
"Client driver port."
);
DEFINE_int32
(
server_port
,
0
,
"Server port."
);
DEFINE_string
(
server_host
,
"127.0.0.1"
,
"Server host."
);
DEFINE_int32
(
client_threads
,
4
,
"Number of client threads."
);
// We have a configurable number of channels for sending RPCs.
// RPCs are sent round-robin on the available channels by the
// various threads. Interesting cases are 1 global channel or
// 1 per-thread channel, but we can support any number.
// The channels are assigned round-robin on an RPC by RPC basis
// rather than just at initialization time in order to also measure the
// impact of cache thrashing caused by channel changes. This is an issue
// if you are not in one of the above "interesting cases"
DEFINE_int32
(
client_channels
,
4
,
"Number of client channels."
);
DEFINE_int32
(
num_rpcs
,
1000
,
"Number of RPCs per thread."
);
DEFINE_int32
(
payload_size
,
1
,
"Payload size in bytes"
);
// Alternatively, specify parameters for test as a workload so that multiple
// tests are initiated back-to-back. This is convenient for keeping a borg
// allocation consistent. This is a space-separated list of
// [threads channels num_rpcs payload_size ]*
DEFINE_string
(
workload
,
""
,
"Workload parameters"
);
using
grpc
::
ChannelInterface
;
using
grpc
::
ChannelInterface
;
using
grpc
::
CreateTestChannel
;
using
grpc
::
CreateTestChannel
;
using
grpc
::
testing
::
ServerStats
;
using
grpc
::
ServerBuilder
;
using
grpc
::
testing
::
ClientArgs
;
using
grpc
::
testing
::
ClientResult
;
using
grpc
::
testing
::
QpsClient
;
using
grpc
::
testing
::
SimpleRequest
;
using
grpc
::
testing
::
SimpleRequest
;
using
grpc
::
testing
::
SimpleResponse
;
using
grpc
::
testing
::
SimpleResponse
;
using
grpc
::
testing
::
StatsRequest
;
using
grpc
::
testing
::
StatsRequest
;
...
@@ -92,8 +80,11 @@ static double now() {
...
@@ -92,8 +80,11 @@ static double now() {
return
1e9
*
tv
.
tv_sec
+
tv
.
tv_nsec
;
return
1e9
*
tv
.
tv_sec
+
tv
.
tv_nsec
;
}
}
void
RunTest
(
const
int
client_threads
,
const
int
client_channels
,
static
bool
got_sigint
=
false
;
const
int
num_rpcs
,
const
int
payload_size
)
{
static
void
sigint_handler
(
int
x
)
{
got_sigint
=
1
;
}
ClientResult
RunTest
(
const
ClientArgs
&
args
)
{
gpr_log
(
GPR_INFO
,
gpr_log
(
GPR_INFO
,
"QPS test with parameters
\n
"
"QPS test with parameters
\n
"
"enable_ssl = %d
\n
"
"enable_ssl = %d
\n
"
...
@@ -101,17 +92,14 @@ void RunTest(const int client_threads, const int client_channels,
...
@@ -101,17 +92,14 @@ void RunTest(const int client_threads, const int client_channels,
"client_threads = %d
\n
"
"client_threads = %d
\n
"
"num_rpcs = %d
\n
"
"num_rpcs = %d
\n
"
"payload_size = %d
\n
"
"payload_size = %d
\n
"
"server_host:server_port = %s:%d
\n\n
"
,
"server_target = %s
\n\n
"
,
FLAGS_enable_ssl
,
client_channels
,
client_threads
,
num_rpcs
,
args
.
enable_ssl
(),
args
.
client_channels
(),
args
.
client_threads
(),
args
.
num_rpcs
(),
payload_size
,
FLAGS_server_host
.
c_str
(),
FLAGS_server_port
);
args
.
payload_size
(),
args
.
server_target
().
c_str
());
std
::
ostringstream
oss
;
oss
<<
FLAGS_server_host
<<
":"
<<
FLAGS_server_port
;
class
ClientChannelInfo
{
class
ClientChannelInfo
{
public:
public:
explicit
ClientChannelInfo
(
const
grpc
::
string
&
server
)
explicit
ClientChannelInfo
(
const
ClientArgs
&
args
)
:
channel_
(
CreateTestChannel
(
server
,
FLAGS_
enable_ssl
)),
:
channel_
(
CreateTestChannel
(
args
.
server
_target
(),
args
.
enable_ssl
()
)),
stub_
(
TestService
::
NewStub
(
channel_
))
{}
stub_
(
TestService
::
NewStub
(
channel_
))
{}
ChannelInterface
*
get_channel
()
{
return
channel_
.
get
();
}
ChannelInterface
*
get_channel
()
{
return
channel_
.
get
();
}
TestService
::
Stub
*
get_stub
()
{
return
stub_
.
get
();
}
TestService
::
Stub
*
get_stub
()
{
return
stub_
.
get
();
}
...
@@ -122,38 +110,33 @@ void RunTest(const int client_threads, const int client_channels,
...
@@ -122,38 +110,33 @@ void RunTest(const int client_threads, const int client_channels,
};
};
std
::
vector
<
ClientChannelInfo
>
channels
;
std
::
vector
<
ClientChannelInfo
>
channels
;
for
(
int
i
=
0
;
i
<
client_channels
;
i
++
)
{
for
(
int
i
=
0
;
i
<
args
.
client_channels
()
;
i
++
)
{
channels
.
push_back
(
ClientChannelInfo
(
oss
.
str
()
));
channels
.
push_back
(
ClientChannelInfo
(
args
));
}
}
std
::
vector
<
std
::
thread
>
threads
;
// Will add threads when ready to execute
std
::
vector
<
std
::
thread
>
threads
;
// Will add threads when ready to execute
std
::
vector
<
::
gpr_histogram
*>
thread_stats
(
client_threads
);
std
::
vector
<
::
gpr_histogram
*>
thread_stats
(
args
.
client_threads
()
);
TestService
::
Stub
*
stub_stats
=
channels
[
0
].
get_stub
();
grpc
::
ClientContext
context_stats_begin
;
grpc
::
ClientContext
context_stats_begin
;
StatsRequest
stats_request
;
ServerStats
server_stats_begin
;
stats_request
.
set_test_num
(
0
);
grpc
::
Status
status_beg
=
stub_stats
->
CollectServerStats
(
&
context_stats_begin
,
stats_request
,
&
server_stats_begin
);
grpc_profiler_start
(
"qps_client.prof"
);
grpc_profiler_start
(
"qps_client.prof"
);
for
(
int
i
=
0
;
i
<
client_threads
;
i
++
)
{
gpr_timespec
start
=
gpr_now
();
for
(
int
i
=
0
;
i
<
args
.
client_threads
();
i
++
)
{
gpr_histogram
*
hist
=
gpr_histogram_create
(
0.01
,
60e9
);
gpr_histogram
*
hist
=
gpr_histogram_create
(
0.01
,
60e9
);
GPR_ASSERT
(
hist
!=
NULL
);
GPR_ASSERT
(
hist
!=
NULL
);
thread_stats
[
i
]
=
hist
;
thread_stats
[
i
]
=
hist
;
threads
.
push_back
(
threads
.
push_back
(
std
::
thread
([
hist
,
client_threads
,
client_channels
,
num_rpcs
,
std
::
thread
([
hist
,
args
,
&
channels
](
int
channel_num
)
{
payload_size
,
&
channels
](
int
channel_num
)
{
SimpleRequest
request
;
SimpleRequest
request
;
SimpleResponse
response
;
SimpleResponse
response
;
request
.
set_response_type
(
request
.
set_response_type
(
grpc
::
testing
::
PayloadType
::
COMPRESSABLE
);
grpc
::
testing
::
PayloadType
::
COMPRESSABLE
);
request
.
set_response_size
(
payload_size
);
request
.
set_response_size
(
args
.
payload_size
()
);
for
(
int
j
=
0
;
j
<
num_rpcs
;
j
++
)
{
for
(
int
j
=
0
;
j
<
args
.
num_rpcs
()
;
j
++
)
{
TestService
::
Stub
*
stub
=
TestService
::
Stub
*
stub
=
channels
[
channel_num
].
get_stub
();
channels
[
channel_num
].
get_stub
();
double
start
=
now
();
double
start
=
now
();
...
@@ -166,26 +149,29 @@ void RunTest(const int client_threads, const int client_channels,
...
@@ -166,26 +149,29 @@ void RunTest(const int client_threads, const int client_channels,
(
response
.
payload
().
type
()
==
(
response
.
payload
().
type
()
==
grpc
::
testing
::
PayloadType
::
COMPRESSABLE
)
&&
grpc
::
testing
::
PayloadType
::
COMPRESSABLE
)
&&
(
response
.
payload
().
body
().
length
()
==
(
response
.
payload
().
body
().
length
()
==
static_cast
<
size_t
>
(
payload_size
)));
static_cast
<
size_t
>
(
args
.
payload_size
()
)));
// Now do runtime round-robin assignment of the next
// Now do runtime round-robin assignment of the next
// channel number
// channel number
channel_num
+=
client_threads
;
channel_num
+=
args
.
client_threads
()
;
channel_num
%=
client_channels
;
channel_num
%=
args
.
client_channels
()
;
}
}
},
},
i
%
client_channels
));
i
%
args
.
client_channels
()
));
}
}
gpr_histogram
*
hist
=
gpr_histogram_create
(
0.01
,
60e9
);
GPR_ASSERT
(
hist
!=
NULL
);
for
(
auto
&
t
:
threads
)
{
for
(
auto
&
t
:
threads
)
{
t
.
join
();
t
.
join
();
}
}
gpr_timespec
stop
=
gpr_now
();
grpc_profiler_stop
();
grpc_profiler_stop
();
for
(
int
i
=
0
;
i
<
client_threads
;
i
++
)
{
gpr_histogram
*
hist
=
gpr_histogram_create
(
0.01
,
60e9
);
GPR_ASSERT
(
hist
!=
NULL
);
for
(
int
i
=
0
;
i
<
args
.
client_threads
();
i
++
)
{
gpr_histogram
*
h
=
thread_stats
[
i
];
gpr_histogram
*
h
=
thread_stats
[
i
];
gpr_log
(
GPR_INFO
,
"latency at thread %d (50/90/95/99/99.9): %f/%f/%f/%f/%f"
,
gpr_log
(
GPR_INFO
,
"latency at thread %d (50/90/95/99/99.9): %f/%f/%f/%f/%f"
,
i
,
gpr_histogram_percentile
(
h
,
50
),
gpr_histogram_percentile
(
h
,
90
),
i
,
gpr_histogram_percentile
(
h
,
50
),
gpr_histogram_percentile
(
h
,
90
),
...
@@ -195,57 +181,54 @@ void RunTest(const int client_threads, const int client_channels,
...
@@ -195,57 +181,54 @@ void RunTest(const int client_threads, const int client_channels,
gpr_histogram_destroy
(
h
);
gpr_histogram_destroy
(
h
);
}
}
gpr_log
(
ClientResult
result
;
GPR_INFO
,
auto
*
latencies
=
result
.
mutable_latencies
();
"latency across %d threads with %d channels and %d payload "
latencies
->
set_l_50
(
gpr_histogram_percentile
(
hist
,
50
));
"(50/90/95/99/99.9): %f / %f / %f / %f / %f"
,
latencies
->
set_l_90
(
gpr_histogram_percentile
(
hist
,
90
));
client_threads
,
client_channels
,
payload_size
,
latencies
->
set_l_99
(
gpr_histogram_percentile
(
hist
,
99
));
gpr_histogram_percentile
(
hist
,
50
),
gpr_histogram_percentile
(
hist
,
90
),
latencies
->
set_l_999
(
gpr_histogram_percentile
(
hist
,
99.9
));
gpr_histogram_percentile
(
hist
,
95
),
gpr_histogram_percentile
(
hist
,
99
),
gpr_timespec
elapsed
=
gpr_time_sub
(
stop
,
start
);
gpr_histogram_percentile
(
hist
,
99.9
));
result
.
set_num_rpcs
(
args
.
client_threads
()
*
args
.
num_rpcs
());
result
.
set_time_elapsed
(
elapsed
.
tv_sec
+
1e-9
*
elapsed
.
tv_nsec
);
gpr_histogram_destroy
(
hist
);
gpr_histogram_destroy
(
hist
);
grpc
::
ClientContext
context_stats_end
;
return
result
;
ServerStats
server_stats_end
;
}
grpc
::
Status
status_end
=
stub_stats
->
CollectServerStats
(
&
context_stats_end
,
stats_request
,
&
server_stats_end
);
double
elapsed
=
server_stats_end
.
time_now
()
-
server_stats_begin
.
time_now
();
class
ClientImpl
:
public
QpsClient
::
Service
{
int
total_rpcs
=
client_threads
*
num_rpcs
;
public:
double
utime
=
server_stats_end
.
time_user
()
-
server_stats_begin
.
time_user
();
double
stime
=
private:
server_stats_end
.
time_system
()
-
server_stats_begin
.
time_system
();
std
::
mutex
client_mu_
;
gpr_log
(
GPR_INFO
,
};
"Elapsed time: %.3f
\n
"
"RPC Count: %d
\n
"
static
void
RunServer
()
{
"QPS: %.3f
\n
"
char
*
server_address
=
NULL
;
"System time: %.3f
\n
"
gpr_join_host_port
(
&
server_address
,
"::"
,
FLAGS_driver_port
);
"User time: %.3f
\n
"
"Resource usage: %.1f%%
\n
"
,
ClientImpl
service
;
elapsed
,
total_rpcs
,
total_rpcs
/
elapsed
,
stime
,
utime
,
(
stime
+
utime
)
/
elapsed
*
100.0
);
ServerBuilder
builder
;
builder
.
AddPort
(
server_address
);
builder
.
RegisterService
(
&
service
);
gpr_free
(
server_address
);
auto
server
=
builder
.
BuildAndStart
();
while
(
!
got_sigint
)
{
std
::
this_thread
::
sleep_for
(
std
::
chrono
::
seconds
(
5
));
}
}
}
int
main
(
int
argc
,
char
**
argv
)
{
int
main
(
int
argc
,
char
**
argv
)
{
grpc_init
();
grpc_init
();
ParseCommandLineFlags
(
&
argc
,
&
argv
,
true
);
ParseCommandLineFlags
(
&
argc
,
&
argv
,
true
);
GPR_ASSERT
(
FLAGS_server_port
);
signal
(
SIGINT
,
sigint_handler
);
if
(
FLAGS_workload
.
length
()
==
0
)
{
RunServer
();
RunTest
(
FLAGS_client_threads
,
FLAGS_client_channels
,
FLAGS_num_rpcs
,
FLAGS_payload_size
);
}
else
{
std
::
istringstream
workload
(
FLAGS_workload
);
int
client_threads
,
client_channels
,
num_rpcs
,
payload_size
;
workload
>>
client_threads
;
while
(
!
workload
.
eof
())
{
workload
>>
client_channels
>>
num_rpcs
>>
payload_size
;
RunTest
(
client_threads
,
client_channels
,
num_rpcs
,
payload_size
);
workload
>>
client_threads
;
}
gpr_log
(
GPR_INFO
,
"Done with specified workload."
);
}
grpc_shutdown
();
grpc_shutdown
();
return
0
;
return
0
;
...
...
This diff is collapsed.
Click to expand it.
test/cpp/qps/qpstest.proto
+
34
−
12
View file @
5c004c68
...
@@ -78,17 +78,24 @@ message Latencies {
...
@@ -78,17 +78,24 @@ message Latencies {
required
double
l_999
=
4
;
required
double
l_999
=
4
;
}
}
message
StartArgs
{
message
ClientArgs
{
required
string
server_host
=
1
;
required
string
server_target
=
1
;
required
int32
server_port
=
2
;
required
bool
enable_ssl
=
3
;
optional
bool
enable_ssl
=
3
[
default
=
false
];
required
int32
client_threads
=
4
;
optional
int32
client_threads
=
4
[
default
=
1
];
// We have a configurable number of channels for sending RPCs.
optional
int32
client_channels
=
5
[
default
=
-
1
];
// RPCs are sent round-robin on the available channels by the
optional
int32
num_rpcs
=
6
[
default
=
1
];
// various threads. Interesting cases are 1 global channel or
optional
int32
payload_size
=
7
[
default
=
1
];
// 1 per-thread channel, but we can support any number.
// The channels are assigned round-robin on an RPC by RPC basis
// rather than just at initialization time in order to also measure the
// impact of cache thrashing caused by channel changes. This is an issue
// if you are not in one of the above "interesting cases"
required
int32
client_channels
=
5
;
required
int32
num_rpcs
=
6
;
required
int32
payload_size
=
7
;
}
}
message
Star
tResult
{
message
Clien
tResult
{
required
Latencies
latencies
=
1
;
required
Latencies
latencies
=
1
;
required
int32
num_rpcs
=
2
;
required
int32
num_rpcs
=
2
;
required
double
time_elapsed
=
3
;
required
double
time_elapsed
=
3
;
...
@@ -96,6 +103,14 @@ message StartResult {
...
@@ -96,6 +103,14 @@ message StartResult {
required
double
time_system
=
5
;
required
double
time_system
=
5
;
}
}
message
ServerArgs
{
required
int32
threads
=
1
;
}
message
ServerStatus
{
optional
ServerStats
stats
=
1
;
}
message
SimpleRequest
{
message
SimpleRequest
{
// Desired payload type in the response from the server.
// Desired payload type in the response from the server.
// If response_type is RANDOM, server randomly chooses one from other formats.
// If response_type is RANDOM, server randomly chooses one from other formats.
...
@@ -153,9 +168,6 @@ message StreamingOutputCallResponse {
...
@@ -153,9 +168,6 @@ message StreamingOutputCallResponse {
}
}
service
TestService
{
service
TestService
{
// Start test with specified workload
rpc
StartTest
(
StartArgs
)
returns
(
Latencies
);
// Collect stats from server, ignore request content
// Collect stats from server, ignore request content
rpc
CollectServerStats
(
StatsRequest
)
returns
(
ServerStats
);
rpc
CollectServerStats
(
StatsRequest
)
returns
(
ServerStats
);
...
@@ -186,3 +198,13 @@ service TestService {
...
@@ -186,3 +198,13 @@ service TestService {
rpc
HalfDuplexCall
(
stream
StreamingOutputCallRequest
)
rpc
HalfDuplexCall
(
stream
StreamingOutputCallRequest
)
returns
(
stream
StreamingOutputCallResponse
);
returns
(
stream
StreamingOutputCallResponse
);
}
}
service
QpsClient
{
// Start test with specified workload
rpc
RunTest
(
ClientArgs
)
returns
(
ClientResult
);
}
service
QpsServer
{
// Start test with specified workload
rpc
RunServer
(
stream
ServerArgs
)
returns
(
stream
ServerStatus
);
}
This diff is collapsed.
Click to expand it.
test/cpp/qps/server.cc
+
64
−
18
View file @
5c004c68
...
@@ -44,6 +44,7 @@
...
@@ -44,6 +44,7 @@
#include
<grpc++/server_builder.h>
#include
<grpc++/server_builder.h>
#include
<grpc++/server_context.h>
#include
<grpc++/server_context.h>
#include
<grpc++/status.h>
#include
<grpc++/status.h>
#include
<grpc++/stream.h>
#include
"src/cpp/server/thread_pool.h"
#include
"src/cpp/server/thread_pool.h"
#include
"test/core/util/grpc_profiler.h"
#include
"test/core/util/grpc_profiler.h"
#include
"test/cpp/qps/qpstest.pb.h"
#include
"test/cpp/qps/qpstest.pb.h"
...
@@ -51,13 +52,13 @@
...
@@ -51,13 +52,13 @@
#include
<grpc/grpc.h>
#include
<grpc/grpc.h>
#include
<grpc/support/log.h>
#include
<grpc/support/log.h>
DEFINE_bool
(
enable_ssl
,
false
,
"Whether to use ssl/tls."
);
DEFINE_int32
(
port
,
0
,
"Server port."
);
DEFINE_int32
(
port
,
0
,
"Server port."
);
DEFINE_int32
(
ser
ver_
threads
,
4
,
"
Number of server threads
."
);
DEFINE_int32
(
dri
ver_
port
,
0
,
"
Server driver port
."
);
using
grpc
::
Server
;
using
grpc
::
Server
;
using
grpc
::
ServerBuilder
;
using
grpc
::
ServerBuilder
;
using
grpc
::
ServerContext
;
using
grpc
::
ServerContext
;
using
grpc
::
ServerReaderWriter
;
using
grpc
::
ThreadPool
;
using
grpc
::
ThreadPool
;
using
grpc
::
testing
::
Payload
;
using
grpc
::
testing
::
Payload
;
using
grpc
::
testing
::
PayloadType
;
using
grpc
::
testing
::
PayloadType
;
...
@@ -66,6 +67,10 @@ using grpc::testing::SimpleRequest;
...
@@ -66,6 +67,10 @@ using grpc::testing::SimpleRequest;
using
grpc
::
testing
::
SimpleResponse
;
using
grpc
::
testing
::
SimpleResponse
;
using
grpc
::
testing
::
StatsRequest
;
using
grpc
::
testing
::
StatsRequest
;
using
grpc
::
testing
::
TestService
;
using
grpc
::
testing
::
TestService
;
using
grpc
::
testing
::
QpsServer
;
using
grpc
::
testing
::
ServerArgs
;
using
grpc
::
testing
::
ServerStats
;
using
grpc
::
testing
::
ServerStatus
;
using
grpc
::
Status
;
using
grpc
::
Status
;
// In some distros, gflags is in the namespace google, and in some others,
// In some distros, gflags is in the namespace google, and in some others,
...
@@ -124,34 +129,76 @@ class TestServiceImpl final : public TestService::Service {
...
@@ -124,34 +129,76 @@ class TestServiceImpl final : public TestService::Service {
}
// namespace
}
// namespace
class
ServerImpl
:
public
QpsServer
::
Service
{
public:
Status
RunServer
(
ServerContext
*
ctx
,
ServerReaderWriter
<
ServerStatus
,
ServerArgs
>*
stream
)
{
ServerArgs
args
;
std
::
unique_ptr
<
ServerStats
>
last_stats
;
if
(
!
stream
->
Read
(
&
args
))
return
Status
::
OK
;
bool
done
=
false
;
while
(
!
done
)
{
std
::
lock_guard
<
std
::
mutex
>
lock
(
server_mu_
);
char
*
server_address
=
NULL
;
gpr_join_host_port
(
&
server_address
,
"::"
,
FLAGS_port
);
TestServiceImpl
service
;
ServerBuilder
builder
;
builder
.
AddPort
(
server_address
);
builder
.
RegisterService
(
&
service
);
gpr_free
(
server_address
);
std
::
unique_ptr
<
ThreadPool
>
pool
(
new
ThreadPool
(
args
.
threads
()));
builder
.
SetThreadPool
(
pool
.
get
());
auto
server
=
builder
.
BuildAndStart
();
gpr_log
(
GPR_INFO
,
"Server listening on %s
\n
"
,
server_address
);
ServerStatus
last_status
;
if
(
last_stats
.
get
())
{
*
last_status
.
mutable_stats
()
=
*
last_stats
;
}
if
(
!
stream
->
Write
(
last_status
))
return
Status
(
grpc
::
UNKNOWN
);
grpc_profiler_start
(
"qps_server.prof"
);
done
=
stream
->
Read
(
&
args
);
grpc_profiler_stop
();
}
ServerStatus
last_status
;
if
(
last_stats
.
get
())
{
*
last_status
.
mutable_stats
()
=
*
last_stats
;
}
stream
->
Write
(
last_status
);
return
Status
::
OK
;
}
private
:
std
::
mutex
server_mu_
;
};
static
void
RunServer
()
{
static
void
RunServer
()
{
char
*
server_address
=
NULL
;
char
*
server_address
=
NULL
;
gpr_join_host_port
(
&
server_address
,
"::"
,
FLAGS_port
);
gpr_join_host_port
(
&
server_address
,
"::"
,
FLAGS_driver_port
);
TestServiceImpl
service
;
SimpleRequest
request
;
ServerImpl
service
;
SimpleResponse
response
;
ServerBuilder
builder
;
ServerBuilder
builder
;
builder
.
AddPort
(
server_address
);
builder
.
AddPort
(
server_address
);
builder
.
RegisterService
(
&
service
);
builder
.
RegisterService
(
&
service
);
std
::
unique_ptr
<
ThreadPool
>
pool
(
new
ThreadPool
(
FLAGS_server_threads
));
gpr_free
(
server_address
);
builder
.
SetThreadPool
(
pool
.
get
());
std
::
unique_ptr
<
Server
>
server
(
builder
.
BuildAndStart
());
gpr_log
(
GPR_INFO
,
"Server listening on %s
\n
"
,
server_address
);
grpc_prof
iler
_start
(
"qps_server.prof"
);
auto
server
=
bu
il
d
er
.
BuildAndStart
(
);
while
(
!
got_sigint
)
{
while
(
!
got_sigint
)
{
std
::
this_thread
::
sleep_for
(
std
::
chrono
::
seconds
(
5
));
std
::
this_thread
::
sleep_for
(
std
::
chrono
::
seconds
(
5
));
}
}
grpc_profiler_stop
();
gpr_free
(
server_address
);
}
}
int
main
(
int
argc
,
char
**
argv
)
{
int
main
(
int
argc
,
char
**
argv
)
{
...
@@ -161,7 +208,6 @@ int main(int argc, char** argv) {
...
@@ -161,7 +208,6 @@ int main(int argc, char** argv) {
signal
(
SIGINT
,
sigint_handler
);
signal
(
SIGINT
,
sigint_handler
);
GPR_ASSERT
(
FLAGS_port
!=
0
);
GPR_ASSERT
(
FLAGS_port
!=
0
);
GPR_ASSERT
(
!
FLAGS_enable_ssl
);
RunServer
();
RunServer
();
grpc_shutdown
();
grpc_shutdown
();
...
...
This diff is collapsed.
Click to expand it.
Preview
0%
Loading
Try again
or
attach a new file
.
Cancel
You are about to add
0
people
to the discussion. Proceed with caution.
Finish editing this message first!
Save comment
Cancel
Please
register
or
sign in
to comment