diff --git a/Makefile b/Makefile index f63926b02e00932029048133981093e87f3b051a..8be6c5fd0a44bdeaa4c8c15e619090f22846f60c 100644 --- a/Makefile +++ b/Makefile @@ -1837,6 +1837,11 @@ test_cxx: buildtests_cxx $(Q) $(BINDIR)/$(CONFIG)/thread_pool_test || ( echo test thread_pool_test failed ; exit 1 ) +test_python: static_c + $(E) "[RUN] Testing python code" + $(Q) tools/run_tests/run_tests.py -lpython -c$(CONFIG) + + tools: privatelibs $(BINDIR)/$(CONFIG)/gen_hpack_tables $(BINDIR)/$(CONFIG)/grpc_create_jwt $(BINDIR)/$(CONFIG)/grpc_fetch_oauth2 $(BINDIR)/$(CONFIG)/grpc_print_google_default_creds_token buildbenchmarks: privatelibs $(BINDIR)/$(CONFIG)/grpc_completion_queue_benchmark $(BINDIR)/$(CONFIG)/low_level_ping_pong_benchmark diff --git a/include/grpc++/completion_queue.h b/include/grpc++/completion_queue.h index d98a8bbd5ff4d3b633dab50507199dc9918f70ed..cb4ce50e93469e22d55aeced65caca886549e234 100644 --- a/include/grpc++/completion_queue.h +++ b/include/grpc++/completion_queue.h @@ -89,46 +89,50 @@ class CompletionQueue { bool Next(void** tag, bool* ok) { return (AsyncNext(tag, ok, std::chrono::system_clock::time_point::max()) != SHUTDOWN); - } - - // Shutdown has to be called, and the CompletionQueue can only be - // destructed when false is returned from Next(). - void Shutdown(); - - grpc_completion_queue* cq() { return cq_; } - - private: - // Friend synchronous wrappers so that they can access Pluck(), which is - // a semi-private API geared towards the synchronous implementation. - template <class R> - friend class ::grpc::ClientReader; - template <class W> - friend class ::grpc::ClientWriter; - template <class R, class W> - friend class ::grpc::ClientReaderWriter; - template <class R> - friend class ::grpc::ServerReader; - template <class W> - friend class ::grpc::ServerWriter; - template <class R, class W> - friend class ::grpc::ServerReaderWriter; - friend class ::grpc::Server; - friend class ::grpc::ServerContext; - friend Status BlockingUnaryCall(ChannelInterface* channel, - const RpcMethod& method, - ClientContext* context, - const grpc::protobuf::Message& request, - grpc::protobuf::Message* result); - - // Wraps grpc_completion_queue_pluck. - // Cannot be mixed with calls to Next(). - bool Pluck(CompletionQueueTag* tag); - - // Does a single polling pluck on tag - void TryPluck(CompletionQueueTag* tag); - - grpc_completion_queue* cq_; // owned -}; + + bool Next(void** tag, bool* ok) { + return ( + AsyncNext(tag, ok, (std::chrono::system_clock::time_point::max)()) != + SHUTDOWN); + } + + // Shutdown has to be called, and the CompletionQueue can only be + // destructed when false is returned from Next(). + void Shutdown(); + + grpc_completion_queue* cq() { return cq_; } + + private: + // Friend synchronous wrappers so that they can access Pluck(), which is + // a semi-private API geared towards the synchronous implementation. + template <class R> + friend class ::grpc::ClientReader; + template <class W> + friend class ::grpc::ClientWriter; + template <class R, class W> + friend class ::grpc::ClientReaderWriter; + template <class R> + friend class ::grpc::ServerReader; + template <class W> + friend class ::grpc::ServerWriter; + template <class R, class W> + friend class ::grpc::ServerReaderWriter; + friend class ::grpc::Server; + friend class ::grpc::ServerContext; + friend Status BlockingUnaryCall( + ChannelInterface * channel, const RpcMethod& method, + ClientContext* context, const grpc::protobuf::Message& request, + grpc::protobuf::Message* result); + + // Wraps grpc_completion_queue_pluck. + // Cannot be mixed with calls to Next(). + bool Pluck(CompletionQueueTag * tag); + + // Does a single polling pluck on tag + void TryPluck(CompletionQueueTag * tag); + + grpc_completion_queue* cq_; // owned + }; } // namespace grpc diff --git a/include/grpc++/credentials.h b/include/grpc++/credentials.h index c3f48592cc741ec01023b19adb56d51551dbad59..2ac3eec95cd33018f341afeef28541cc06f6c0f6 100644 --- a/include/grpc++/credentials.h +++ b/include/grpc++/credentials.h @@ -113,6 +113,12 @@ std::unique_ptr<Credentials> ServiceAccountCredentials( std::unique_ptr<Credentials> JWTCredentials( const grpc::string& json_key, std::chrono::seconds token_lifetime); +// Builds refresh token credentials. +// json_refresh_token is the JSON string containing the refresh token along +// with a client_id and client_secret. +std::unique_ptr<Credentials> RefreshTokenCredentials( + const grpc::string& json_refresh_token); + // Builds IAM credentials. std::unique_ptr<Credentials> IAMCredentials( const grpc::string& authorization_token, diff --git a/include/grpc/support/atm.h b/include/grpc/support/atm.h index feca6b30b23452476541f28456a1dd5fb43aaae0..ba8d7f579e19ba987d63ffce7158258ca715f19a 100644 --- a/include/grpc/support/atm.h +++ b/include/grpc/support/atm.h @@ -83,7 +83,7 @@ #include <grpc/support/atm_gcc_atomic.h> #elif defined(GPR_GCC_SYNC) #include <grpc/support/atm_gcc_sync.h> -#elif defined(GPR_WIN32) +#elif defined(GPR_WIN32_ATOMIC) #include <grpc/support/atm_win32.h> #else #error could not determine platform for atm diff --git a/include/grpc/support/atm_win32.h b/include/grpc/support/atm_win32.h index 18bf372004bbc878e97fb690b83f13bbc7eef8af..8b5322488ec28d6c9f780031c6210315b8224674 100644 --- a/include/grpc/support/atm_win32.h +++ b/include/grpc/support/atm_win32.h @@ -51,7 +51,7 @@ static __inline gpr_atm gpr_atm_acq_load(const gpr_atm *p) { static __inline gpr_atm gpr_atm_no_barrier_load(const gpr_atm *p) { /* TODO(dklempner): Can we implement something better here? */ - gpr_atm_acq_load(p); + return gpr_atm_acq_load(p); } static __inline void gpr_atm_rel_store(gpr_atm *p, gpr_atm value) { diff --git a/include/grpc/support/port_platform.h b/include/grpc/support/port_platform.h index f04c2e76afadbefce4574b748b8796f3381db391..1b613dc2fd76675fc541baef7282658aa9e71ec9 100644 --- a/include/grpc/support/port_platform.h +++ b/include/grpc/support/port_platform.h @@ -43,11 +43,21 @@ #define GPR_ARCH_64 1 #define GPR_GETPID_IN_PROCESS_H 1 #define GPR_WINSOCK_SOCKET 1 +#ifdef __GNUC__ +#define GPR_GCC_ATOMIC 1 +#else +#define GPR_WIN32_ATOMIC 1 +#endif #elif defined(_WIN32) || defined(WIN32) #define GPR_ARCH_32 1 #define GPR_WIN32 1 #define GPR_GETPID_IN_PROCESS_H 1 #define GPR_WINSOCK_SOCKET 1 +#ifdef __GNUC__ +#define GPR_GCC_ATOMIC 1 +#else +#define GPR_WIN32_ATOMIC 1 +#endif #elif defined(ANDROID) || defined(__ANDROID__) #define GPR_ANDROID 1 #define GPR_ARCH_32 1 @@ -167,8 +177,8 @@ #endif /* Validate platform combinations */ -#if defined(GPR_GCC_ATOMIC) + defined(GPR_GCC_SYNC) + defined(GPR_WIN32) != 1 -#error Must define exactly one of GPR_GCC_ATOMIC, GPR_GCC_SYNC, GPR_WIN32 +#if defined(GPR_GCC_ATOMIC) + defined(GPR_GCC_SYNC) + defined(GPR_WIN32_ATOMIC) != 1 +#error Must define exactly one of GPR_GCC_ATOMIC, GPR_GCC_SYNC, GPR_WIN32_ATOMIC #endif #if defined(GPR_ARCH_32) + defined(GPR_ARCH_64) != 1 diff --git a/src/compiler/python_generator.cc b/src/compiler/python_generator.cc index 8136602ae5a7fdf785d22682eb5d390ab48844fa..e4f85450f5a8c33ff54697e1da6a36805209c6bd 100644 --- a/src/compiler/python_generator.cc +++ b/src/compiler/python_generator.cc @@ -381,7 +381,7 @@ bool PrintStubFactory(const std::string& package_qualified_service_name, bool PrintPreamble(const FileDescriptor* file, Printer* out) { out->Print("import abc\n"); out->Print("from grpc.early_adopter import implementations\n"); - out->Print("from grpc.early_adopter import utilities\n"); + out->Print("from grpc.framework.alpha import utilities\n"); return true; } diff --git a/src/core/channel/http_server_filter.c b/src/core/channel/http_server_filter.c index d1616a3450dc0953d3afcb7cf9480da4a166a339..f565cbf3aebf830f9a69ecae186ce1f54261c3e9 100644 --- a/src/core/channel/http_server_filter.c +++ b/src/core/channel/http_server_filter.c @@ -66,6 +66,10 @@ typedef struct channel_data { grpc_mdelem *status_ok; grpc_mdelem *status_not_found; grpc_mdstr *path_key; + grpc_mdstr *authority_key; + grpc_mdstr *host_key; + + grpc_mdctx *mdctx; size_t gettable_count; gettable *gettables; @@ -181,6 +185,15 @@ static void call_op(grpc_call_element *elem, grpc_call_element *from_elem, } calld->path = op->data.metadata; op->done_cb(op->user_data, GRPC_OP_OK); + } else if (op->data.metadata->key == channeld->host_key) { + /* translate host to :authority since :authority may be + omitted */ + grpc_mdelem *authority = grpc_mdelem_from_metadata_strings( + channeld->mdctx, channeld->authority_key, op->data.metadata->value); + grpc_mdelem_unref(op->data.metadata); + op->data.metadata = authority; + /* pass the event up */ + grpc_call_next_op(elem, op); } else { /* pass the event up */ grpc_call_next_op(elem, op); @@ -305,9 +318,13 @@ static void init_channel_elem(grpc_channel_element *elem, channeld->https_scheme = grpc_mdelem_from_strings(mdctx, ":scheme", "https"); channeld->grpc_scheme = grpc_mdelem_from_strings(mdctx, ":scheme", "grpc"); channeld->path_key = grpc_mdstr_from_string(mdctx, ":path"); + channeld->authority_key = grpc_mdstr_from_string(mdctx, ":authority"); + channeld->host_key = grpc_mdstr_from_string(mdctx, "host"); channeld->content_type = grpc_mdelem_from_strings(mdctx, "content-type", "application/grpc"); + channeld->mdctx = mdctx; + /* initialize http download support */ channeld->gettable_count = 0; channeld->gettables = NULL; @@ -357,6 +374,8 @@ static void destroy_channel_elem(grpc_channel_element *elem) { grpc_mdelem_unref(channeld->grpc_scheme); grpc_mdelem_unref(channeld->content_type); grpc_mdstr_unref(channeld->path_key); + grpc_mdstr_unref(channeld->authority_key); + grpc_mdstr_unref(channeld->host_key); } const grpc_channel_filter grpc_http_server_filter = { diff --git a/src/cpp/client/secure_credentials.cc b/src/cpp/client/secure_credentials.cc index 412dc0764ffc35d85e9f0ff0ccd24fd29ad2f242..e3c66376236e7e6c4b0a65073f723704794e25aa 100644 --- a/src/cpp/client/secure_credentials.cc +++ b/src/cpp/client/secure_credentials.cc @@ -123,6 +123,13 @@ std::unique_ptr<Credentials> JWTCredentials( grpc_jwt_credentials_create(json_key.c_str(), lifetime)); } +// Builds refresh token credentials. +std::unique_ptr<Credentials> RefreshTokenCredentials( + const grpc::string& json_refresh_token) { + return WrapCredentials( + grpc_refresh_token_credentials_create(json_refresh_token.c_str())); +} + // Builds IAM credentials. std::unique_ptr<Credentials> IAMCredentials( const grpc::string& authorization_token, diff --git a/src/node/ext/call.cc b/src/node/ext/call.cc index afb654178318127ec38b454cc2379fd41ddf1eae..8cc3e38cd9513d78d8a055a00731132876a2d90c 100644 --- a/src/node/ext/call.cc +++ b/src/node/ext/call.cc @@ -75,6 +75,9 @@ using v8::Value; NanCallback *Call::constructor; Persistent<FunctionTemplate> Call::fun_tpl; +bool EndsWith(const char *str, const char *substr) { + return strcmp(str+strlen(str)-strlen(substr), substr) == 0; +} bool CreateMetadataArray(Handle<Object> metadata, grpc_metadata_array *array, shared_ptr<Resources> resources) { @@ -99,14 +102,19 @@ bool CreateMetadataArray(Handle<Object> metadata, grpc_metadata_array *array, Handle<Value> value = values->Get(j); grpc_metadata *current = &array->metadata[array->count]; current->key = **utf8_key; - if (::node::Buffer::HasInstance(value)) { - current->value = ::node::Buffer::Data(value); - current->value_length = ::node::Buffer::Length(value); - Persistent<Value> *handle = new Persistent<Value>(); - NanAssignPersistent(*handle, value); - resources->handles.push_back(unique_ptr<PersistentHolder>( - new PersistentHolder(handle))); - } else if (value->IsString()) { + // Only allow binary headers for "-bin" keys + if (EndsWith(current->key, "-bin")) { + if (::node::Buffer::HasInstance(value)) { + current->value = ::node::Buffer::Data(value); + current->value_length = ::node::Buffer::Length(value); + Persistent<Value> *handle = new Persistent<Value>(); + NanAssignPersistent(*handle, value); + resources->handles.push_back(unique_ptr<PersistentHolder>( + new PersistentHolder(handle))); + continue; + } + } + if (value->IsString()) { Handle<String> string_value = value->ToString(); NanUtf8String *utf8_value = new NanUtf8String(string_value); resources->strings.push_back(unique_ptr<NanUtf8String>(utf8_value)); @@ -146,9 +154,13 @@ Handle<Value> ParseMetadata(const grpc_metadata_array *metadata_array) { array = NanNew<Array>(size_map[elem->key]); metadata_object->Set(key_string, array); } - array->Set(index_map[elem->key], - MakeFastBuffer( - NanNewBufferHandle(elem->value, elem->value_length))); + if (EndsWith(elem->key, "-bin")) { + array->Set(index_map[elem->key], + MakeFastBuffer( + NanNewBufferHandle(elem->value, elem->value_length))); + } else { + array->Set(index_map[elem->key], NanNew(elem->value)); + } index_map[elem->key] += 1; } return NanEscapeScope(metadata_object); diff --git a/src/node/package.json b/src/node/package.json index 1d0aa0e669d98dd217fbd032b166b046acdc8bd8..9f52f8c988ebe252284ccfba9484a68ffe1d08ff 100644 --- a/src/node/package.json +++ b/src/node/package.json @@ -1,6 +1,6 @@ { "name": "grpc", - "version": "0.5.5", + "version": "0.6.0", "author": "Google Inc.", "description": "gRPC Library for Node", "homepage": "http://www.grpc.io/", diff --git a/src/node/test/call_test.js b/src/node/test/call_test.js index 7b2b36ae3710add87bcaa6f6d03fe1003bdac167..98158ffff357d0efed892e8c91d6c8851d7c7d4e 100644 --- a/src/node/test/call_test.js +++ b/src/node/test/call_test.js @@ -142,8 +142,8 @@ describe('call', function() { assert.doesNotThrow(function() { var batch = {}; batch[grpc.opType.SEND_INITIAL_METADATA] = { - 'key1': [new Buffer('value1')], - 'key2': [new Buffer('value2')] + 'key1-bin': [new Buffer('value1')], + 'key2-bin': [new Buffer('value2')] }; call.startBatch(batch, function(err, resp) { assert.ifError(err); diff --git a/src/node/test/end_to_end_test.js b/src/node/test/end_to_end_test.js index c39364d410ef863b37988fd3581bdd2a416e13a5..60e9861bc863ea533ce5646851b5ad8e80d8be24 100644 --- a/src/node/test/end_to_end_test.js +++ b/src/node/test/end_to_end_test.js @@ -138,21 +138,21 @@ describe('end-to-end', function() { client_batch[grpc.opType.RECV_STATUS_ON_CLIENT] = true; call.startBatch(client_batch, function(err, response) { assert.ifError(err); - assert(response['send metadata']); - assert(response['client close']); - assert(response.hasOwnProperty('metadata')); - assert.strictEqual(response.metadata.server_key[0].toString(), - 'server_value'); - assert.deepEqual(response.status, {'code': grpc.status.OK, - 'details': status_text, - 'metadata': {}}); + assert.deepEqual(response,{ + 'send metadata': true, + 'client close': true, + metadata: {server_key: ['server_value']}, + status: {'code': grpc.status.OK, + 'details': status_text, + 'metadata': {}} + }); done(); }); server.requestCall(function(err, call_details) { var new_call = call_details['new call']; assert.notEqual(new_call, null); - assert.strictEqual(new_call.metadata.client_key[0].toString(), + assert.strictEqual(new_call.metadata.client_key[0], 'client_value'); var server_call = new_call.call; assert.notEqual(server_call, null); diff --git a/src/python/interop/interop/methods.py b/src/python/interop/interop/methods.py index 4f83ccc085b847685071e965b15f5ddf883da796..79550a37893f506c6e1c27e9cb55a76db0222c8b 100644 --- a/src/python/interop/interop/methods.py +++ b/src/python/interop/interop/methods.py @@ -32,7 +32,7 @@ import enum import threading -from grpc.early_adopter import utilities +from grpc.framework.alpha import utilities from interop import empty_pb2 from interop import messages_pb2 diff --git a/src/python/src/grpc/_adapter/_face_test_case.py b/src/python/src/grpc/_adapter/_face_test_case.py index 2542eb6da4d794e5b4636e4e8723e130a3e2ca44..923e889844ddb99654a9e02120fc259e125626dc 100644 --- a/src/python/src/grpc/_adapter/_face_test_case.py +++ b/src/python/src/grpc/_adapter/_face_test_case.py @@ -34,7 +34,7 @@ import unittest from grpc._adapter import fore from grpc._adapter import rear from grpc.framework.base import util -from grpc.framework.base.packets import implementations as tickets_implementations +from grpc.framework.base import implementations as base_implementations from grpc.framework.face import implementations as face_implementations from grpc.framework.face.testing import coverage from grpc.framework.face.testing import serial @@ -69,8 +69,8 @@ class FaceTestCase(test_case.FaceTestCase, coverage.BlockingCoverage): serialization.request_serializers, serialization.response_deserializers, False, None, None, None) rear_link.start() - front = tickets_implementations.front(pool, pool, pool) - back = tickets_implementations.back( + front = base_implementations.front_link(pool, pool, pool) + back = base_implementations.back_link( servicer, pool, pool, pool, _TIMEOUT, _MAXIMUM_TIMEOUT) fore_link.join_rear_link(back) back.join_fore_link(fore_link) diff --git a/src/python/src/grpc/_adapter/_links_test.py b/src/python/src/grpc/_adapter/_links_test.py index 49fd1f7a1ca5f48d2176671e6e3278e3488c19a5..cfdcc2c4bc9c09edbeee7087f940f64e653725c9 100644 --- a/src/python/src/grpc/_adapter/_links_test.py +++ b/src/python/src/grpc/_adapter/_links_test.py @@ -37,7 +37,6 @@ from grpc._adapter import _test_links from grpc._adapter import fore from grpc._adapter import rear from grpc.framework.base import interfaces -from grpc.framework.base.packets import packets as tickets from grpc.framework.foundation import logging_pool _IDENTITY = lambda x: x @@ -60,11 +59,11 @@ class RoundTripTest(unittest.TestCase): test_fore_link = _test_links.ForeLink(None, None) def rear_action(front_to_back_ticket, fore_link): if front_to_back_ticket.kind in ( - tickets.FrontToBackPacket.Kind.COMPLETION, - tickets.FrontToBackPacket.Kind.ENTIRE): - back_to_front_ticket = tickets.BackToFrontPacket( + interfaces.FrontToBackTicket.Kind.COMPLETION, + interfaces.FrontToBackTicket.Kind.ENTIRE): + back_to_front_ticket = interfaces.BackToFrontTicket( front_to_back_ticket.operation_id, 0, - tickets.BackToFrontPacket.Kind.COMPLETION, None) + interfaces.BackToFrontTicket.Kind.COMPLETION, None) fore_link.accept_back_to_front_ticket(back_to_front_ticket) test_rear_link = _test_links.RearLink(rear_action, None) @@ -82,8 +81,8 @@ class RoundTripTest(unittest.TestCase): test_fore_link.join_rear_link(rear_link) rear_link.start() - front_to_back_ticket = tickets.FrontToBackPacket( - test_operation_id, 0, tickets.FrontToBackPacket.Kind.ENTIRE, + front_to_back_ticket = interfaces.FrontToBackTicket( + test_operation_id, 0, interfaces.FrontToBackTicket.Kind.ENTIRE, test_method, interfaces.ServicedSubscription.Kind.FULL, None, None, _TIMEOUT) rear_link.accept_front_to_back_ticket(front_to_back_ticket) @@ -91,7 +90,7 @@ class RoundTripTest(unittest.TestCase): with test_fore_link.condition: while (not test_fore_link.tickets or test_fore_link.tickets[-1].kind is - tickets.BackToFrontPacket.Kind.CONTINUATION): + interfaces.BackToFrontTicket.Kind.CONTINUATION): test_fore_link.condition.wait() rear_link.stop() @@ -100,7 +99,7 @@ class RoundTripTest(unittest.TestCase): with test_fore_link.condition: self.assertIs( test_fore_link.tickets[-1].kind, - tickets.BackToFrontPacket.Kind.COMPLETION) + interfaces.BackToFrontTicket.Kind.COMPLETION) def testEntireRoundTrip(self): test_operation_id = object() @@ -115,14 +114,14 @@ class RoundTripTest(unittest.TestCase): else: payload = test_back_to_front_datum terminal = front_to_back_ticket.kind in ( - tickets.FrontToBackPacket.Kind.COMPLETION, - tickets.FrontToBackPacket.Kind.ENTIRE) + interfaces.FrontToBackTicket.Kind.COMPLETION, + interfaces.FrontToBackTicket.Kind.ENTIRE) if payload is not None or terminal: if terminal: - kind = tickets.BackToFrontPacket.Kind.COMPLETION + kind = interfaces.BackToFrontTicket.Kind.COMPLETION else: - kind = tickets.BackToFrontPacket.Kind.CONTINUATION - back_to_front_ticket = tickets.BackToFrontPacket( + kind = interfaces.BackToFrontTicket.Kind.CONTINUATION + back_to_front_ticket = interfaces.BackToFrontTicket( front_to_back_ticket.operation_id, rear_sequence_number[0], kind, payload) rear_sequence_number[0] += 1 @@ -144,8 +143,8 @@ class RoundTripTest(unittest.TestCase): test_fore_link.join_rear_link(rear_link) rear_link.start() - front_to_back_ticket = tickets.FrontToBackPacket( - test_operation_id, 0, tickets.FrontToBackPacket.Kind.ENTIRE, + front_to_back_ticket = interfaces.FrontToBackTicket( + test_operation_id, 0, interfaces.FrontToBackTicket.Kind.ENTIRE, test_method, interfaces.ServicedSubscription.Kind.FULL, None, test_front_to_back_datum, _TIMEOUT) rear_link.accept_front_to_back_ticket(front_to_back_ticket) @@ -153,7 +152,7 @@ class RoundTripTest(unittest.TestCase): with test_fore_link.condition: while (not test_fore_link.tickets or test_fore_link.tickets[-1].kind is not - tickets.BackToFrontPacket.Kind.COMPLETION): + interfaces.BackToFrontTicket.Kind.COMPLETION): test_fore_link.condition.wait() rear_link.stop() @@ -183,14 +182,14 @@ class RoundTripTest(unittest.TestCase): else: response = None terminal = front_to_back_ticket.kind in ( - tickets.FrontToBackPacket.Kind.COMPLETION, - tickets.FrontToBackPacket.Kind.ENTIRE) + interfaces.FrontToBackTicket.Kind.COMPLETION, + interfaces.FrontToBackTicket.Kind.ENTIRE) if response is not None or terminal: if terminal: - kind = tickets.BackToFrontPacket.Kind.COMPLETION + kind = interfaces.BackToFrontTicket.Kind.COMPLETION else: - kind = tickets.BackToFrontPacket.Kind.CONTINUATION - back_to_front_ticket = tickets.BackToFrontPacket( + kind = interfaces.BackToFrontTicket.Kind.CONTINUATION + back_to_front_ticket = interfaces.BackToFrontTicket( front_to_back_ticket.operation_id, rear_sequence_number[0], kind, response) rear_sequence_number[0] += 1 @@ -213,22 +212,23 @@ class RoundTripTest(unittest.TestCase): test_fore_link.join_rear_link(rear_link) rear_link.start() - commencement_ticket = tickets.FrontToBackPacket( - test_operation_id, 0, tickets.FrontToBackPacket.Kind.COMMENCEMENT, - test_method, interfaces.ServicedSubscription.Kind.FULL, None, None, + commencement_ticket = interfaces.FrontToBackTicket( + test_operation_id, 0, + interfaces.FrontToBackTicket.Kind.COMMENCEMENT, test_method, + interfaces.ServicedSubscription.Kind.FULL, None, None, _TIMEOUT) fore_sequence_number = 1 rear_link.accept_front_to_back_ticket(commencement_ticket) for request in scenario.requests(): - continuation_ticket = tickets.FrontToBackPacket( + continuation_ticket = interfaces.FrontToBackTicket( test_operation_id, fore_sequence_number, - tickets.FrontToBackPacket.Kind.CONTINUATION, None, None, None, + interfaces.FrontToBackTicket.Kind.CONTINUATION, None, None, None, request, None) fore_sequence_number += 1 rear_link.accept_front_to_back_ticket(continuation_ticket) - completion_ticket = tickets.FrontToBackPacket( + completion_ticket = interfaces.FrontToBackTicket( test_operation_id, fore_sequence_number, - tickets.FrontToBackPacket.Kind.COMPLETION, None, None, None, None, + interfaces.FrontToBackTicket.Kind.COMPLETION, None, None, None, None, None) fore_sequence_number += 1 rear_link.accept_front_to_back_ticket(completion_ticket) @@ -236,7 +236,7 @@ class RoundTripTest(unittest.TestCase): with test_fore_link.condition: while (not test_fore_link.tickets or test_fore_link.tickets[-1].kind is not - tickets.BackToFrontPacket.Kind.COMPLETION): + interfaces.BackToFrontTicket.Kind.COMPLETION): test_fore_link.condition.wait() rear_link.stop() diff --git a/src/python/src/grpc/_adapter/_lonely_rear_link_test.py b/src/python/src/grpc/_adapter/_lonely_rear_link_test.py index ead0b9eb38322b50b7cc2a8fa352a3090298296d..25799d679c70221265d709f26f5c2b3ddeb4515d 100644 --- a/src/python/src/grpc/_adapter/_lonely_rear_link_test.py +++ b/src/python/src/grpc/_adapter/_lonely_rear_link_test.py @@ -34,7 +34,6 @@ import unittest from grpc._adapter import _test_links from grpc._adapter import rear from grpc.framework.base import interfaces -from grpc.framework.base.packets import packets from grpc.framework.foundation import logging_pool _IDENTITY = lambda x: x @@ -68,7 +67,7 @@ class LonelyRearLinkTest(unittest.TestCase): rear_link.join_fore_link(fore_link) rear_link.start() - front_to_back_ticket = packets.FrontToBackPacket( + front_to_back_ticket = interfaces.FrontToBackTicket( test_operation_id, 0, front_to_back_ticket_kind, test_method, interfaces.ServicedSubscription.Kind.FULL, None, None, _TIMEOUT) rear_link.accept_front_to_back_ticket(front_to_back_ticket) @@ -77,7 +76,7 @@ class LonelyRearLinkTest(unittest.TestCase): while True: if (fore_link.tickets and fore_link.tickets[-1].kind is not - packets.BackToFrontPacket.Kind.CONTINUATION): + interfaces.BackToFrontTicket.Kind.CONTINUATION): break fore_link.condition.wait() @@ -86,15 +85,15 @@ class LonelyRearLinkTest(unittest.TestCase): with fore_link.condition: self.assertIsNot( fore_link.tickets[-1].kind, - packets.BackToFrontPacket.Kind.COMPLETION) + interfaces.BackToFrontTicket.Kind.COMPLETION) - def testLonelyClientCommencementPacket(self): + def testLonelyClientCommencementTicket(self): self._perform_lonely_client_test_with_ticket_kind( - packets.FrontToBackPacket.Kind.COMMENCEMENT) + interfaces.FrontToBackTicket.Kind.COMMENCEMENT) - def testLonelyClientEntirePacket(self): + def testLonelyClientEntireTicket(self): self._perform_lonely_client_test_with_ticket_kind( - packets.FrontToBackPacket.Kind.ENTIRE) + interfaces.FrontToBackTicket.Kind.ENTIRE) if __name__ == '__main__': diff --git a/src/python/src/grpc/_adapter/_test_links.py b/src/python/src/grpc/_adapter/_test_links.py index ac0d6e20b69ec6f2fb67040800f5cc8a68d615f6..86c7e61b17ddcf2bfd356e62fc2f2211bc45ed96 100644 --- a/src/python/src/grpc/_adapter/_test_links.py +++ b/src/python/src/grpc/_adapter/_test_links.py @@ -31,7 +31,7 @@ import threading -from grpc.framework.base.packets import interfaces +from grpc.framework.base import interfaces class ForeLink(interfaces.ForeLink): diff --git a/src/python/src/grpc/_adapter/fore.py b/src/python/src/grpc/_adapter/fore.py index 16e5a2018cbed7d9e89a26d7372a10144a90ac60..05016cdaf3130459c6291c656d08bb0e4060c8e1 100644 --- a/src/python/src/grpc/_adapter/fore.py +++ b/src/python/src/grpc/_adapter/fore.py @@ -36,10 +36,8 @@ import time from grpc._adapter import _common from grpc._adapter import _low -from grpc.framework.base import interfaces -from grpc.framework.base.packets import interfaces as ticket_interfaces -from grpc.framework.base.packets import null -from grpc.framework.base.packets import packets as tickets +from grpc.framework.base import interfaces as base_interfaces +from grpc.framework.base import null from grpc.framework.foundation import activated from grpc.framework.foundation import logging_pool @@ -69,7 +67,7 @@ def _status(call, rpc_state): rpc_state.write.low = _LowWrite.CLOSED -class ForeLink(ticket_interfaces.ForeLink, activated.Activated): +class ForeLink(base_interfaces.ForeLink, activated.Activated): """A service-side bridge between RPC Framework and the C-ish _low code.""" def __init__( @@ -127,9 +125,9 @@ class ForeLink(ticket_interfaces.ForeLink, activated.Activated): self._request_deserializers[method], self._response_serializers[method]) - ticket = tickets.FrontToBackPacket( - call, 0, tickets.FrontToBackPacket.Kind.COMMENCEMENT, method, - interfaces.ServicedSubscription.Kind.FULL, None, None, + ticket = base_interfaces.FrontToBackTicket( + call, 0, base_interfaces.FrontToBackTicket.Kind.COMMENCEMENT, method, + base_interfaces.ServicedSubscription.Kind.FULL, None, None, service_acceptance.deadline - time.time()) self._rear_link.accept_front_to_back_ticket(ticket) @@ -145,14 +143,16 @@ class ForeLink(ticket_interfaces.ForeLink, activated.Activated): sequence_number = rpc_state.sequence_number rpc_state.sequence_number += 1 if event.bytes is None: - ticket = tickets.FrontToBackPacket( - call, sequence_number, tickets.FrontToBackPacket.Kind.COMPLETION, - None, None, None, None, None) + ticket = base_interfaces.FrontToBackTicket( + call, sequence_number, + base_interfaces.FrontToBackTicket.Kind.COMPLETION, None, None, None, + None, None) else: call.read(call) - ticket = tickets.FrontToBackPacket( - call, sequence_number, tickets.FrontToBackPacket.Kind.CONTINUATION, - None, None, None, rpc_state.deserializer(event.bytes), None) + ticket = base_interfaces.FrontToBackTicket( + call, sequence_number, + base_interfaces.FrontToBackTicket.Kind.CONTINUATION, None, None, + None, rpc_state.deserializer(event.bytes), None) self._rear_link.accept_front_to_back_ticket(ticket) @@ -180,10 +180,10 @@ class ForeLink(ticket_interfaces.ForeLink, activated.Activated): sequence_number = rpc_state.sequence_number rpc_state.sequence_number += 1 - ticket = tickets.FrontToBackPacket( + ticket = base_interfaces.FrontToBackTicket( call, sequence_number, - tickets.FrontToBackPacket.Kind.TRANSMISSION_FAILURE, None, None, - None, None, None) + base_interfaces.FrontToBackTicket.Kind.TRANSMISSION_FAILURE, None, + None, None, None, None) self._rear_link.accept_front_to_back_ticket(ticket) def _on_finish_event(self, event): @@ -200,19 +200,21 @@ class ForeLink(ticket_interfaces.ForeLink, activated.Activated): sequence_number = rpc_state.sequence_number rpc_state.sequence_number += 1 if code is _low.Code.CANCELLED: - ticket = tickets.FrontToBackPacket( - call, sequence_number, tickets.FrontToBackPacket.Kind.CANCELLATION, - None, None, None, None, None) + ticket = base_interfaces.FrontToBackTicket( + call, sequence_number, + base_interfaces.FrontToBackTicket.Kind.CANCELLATION, None, None, + None, None, None) elif code is _low.Code.EXPIRED: - ticket = tickets.FrontToBackPacket( - call, sequence_number, tickets.FrontToBackPacket.Kind.EXPIRATION, - None, None, None, None, None) + ticket = base_interfaces.FrontToBackTicket( + call, sequence_number, + base_interfaces.FrontToBackTicket.Kind.EXPIRATION, None, None, None, + None, None) else: # TODO(nathaniel): Better mapping of codes to ticket-categories - ticket = tickets.FrontToBackPacket( + ticket = base_interfaces.FrontToBackTicket( call, sequence_number, - tickets.FrontToBackPacket.Kind.TRANSMISSION_FAILURE, None, None, - None, None, None) + base_interfaces.FrontToBackTicket.Kind.TRANSMISSION_FAILURE, None, + None, None, None, None) self._rear_link.accept_front_to_back_ticket(ticket) def _spin(self, completion_queue, server): @@ -268,7 +270,7 @@ class ForeLink(ticket_interfaces.ForeLink, activated.Activated): self._rpc_states.pop(call, None) def join_rear_link(self, rear_link): - """See ticket_interfaces.ForeLink.join_rear_link for specification.""" + """See base_interfaces.ForeLink.join_rear_link for specification.""" self._rear_link = null.NULL_REAR_LINK if rear_link is None else rear_link def _start(self): @@ -348,14 +350,14 @@ class ForeLink(ticket_interfaces.ForeLink, activated.Activated): return self._port def accept_back_to_front_ticket(self, ticket): - """See ticket_interfaces.ForeLink.accept_back_to_front_ticket for spec.""" + """See base_interfaces.ForeLink.accept_back_to_front_ticket for spec.""" with self._condition: if self._server is None: return - if ticket.kind is tickets.BackToFrontPacket.Kind.CONTINUATION: + if ticket.kind is base_interfaces.BackToFrontTicket.Kind.CONTINUATION: self._continue(ticket.operation_id, ticket.payload) - elif ticket.kind is tickets.BackToFrontPacket.Kind.COMPLETION: + elif ticket.kind is base_interfaces.BackToFrontTicket.Kind.COMPLETION: self._complete(ticket.operation_id, ticket.payload) else: self._cancel(ticket.operation_id) diff --git a/src/python/src/grpc/_adapter/rear.py b/src/python/src/grpc/_adapter/rear.py index eee008e4dfa40a6e58adb3d003701ae9357fbeae..f19321c4266d817c328cc7c54ef1c64cc3089d37 100644 --- a/src/python/src/grpc/_adapter/rear.py +++ b/src/python/src/grpc/_adapter/rear.py @@ -36,9 +36,8 @@ import time from grpc._adapter import _common from grpc._adapter import _low -from grpc.framework.base.packets import interfaces as ticket_interfaces -from grpc.framework.base.packets import null -from grpc.framework.base.packets import packets as tickets +from grpc.framework.base import interfaces as base_interfaces +from grpc.framework.base import null from grpc.framework.foundation import activated from grpc.framework.foundation import logging_pool @@ -88,7 +87,7 @@ def _write(operation_id, call, outstanding, write_state, serialized_payload): raise ValueError('Write attempted after writes completed!') -class RearLink(ticket_interfaces.RearLink, activated.Activated): +class RearLink(base_interfaces.RearLink, activated.Activated): """An invocation-side bridge between RPC Framework and the C-ish _low code.""" def __init__( @@ -152,9 +151,9 @@ class RearLink(ticket_interfaces.RearLink, activated.Activated): else: logging.error('RPC write not accepted! Event: %s', (event,)) rpc_state.active = False - ticket = tickets.BackToFrontPacket( + ticket = base_interfaces.BackToFrontTicket( operation_id, rpc_state.common.sequence_number, - tickets.BackToFrontPacket.Kind.TRANSMISSION_FAILURE, None) + base_interfaces.BackToFrontTicket.Kind.TRANSMISSION_FAILURE, None) rpc_state.common.sequence_number += 1 self._fore_link.accept_back_to_front_ticket(ticket) @@ -163,9 +162,9 @@ class RearLink(ticket_interfaces.RearLink, activated.Activated): rpc_state.call.read(operation_id) rpc_state.outstanding.add(_low.Event.Kind.READ_ACCEPTED) - ticket = tickets.BackToFrontPacket( + ticket = base_interfaces.BackToFrontTicket( operation_id, rpc_state.common.sequence_number, - tickets.BackToFrontPacket.Kind.CONTINUATION, + base_interfaces.BackToFrontTicket.Kind.CONTINUATION, rpc_state.common.deserializer(event.bytes)) rpc_state.common.sequence_number += 1 self._fore_link.accept_back_to_front_ticket(ticket) @@ -174,9 +173,9 @@ class RearLink(ticket_interfaces.RearLink, activated.Activated): if not event.complete_accepted: logging.error('RPC complete not accepted! Event: %s', (event,)) rpc_state.active = False - ticket = tickets.BackToFrontPacket( + ticket = base_interfaces.BackToFrontTicket( operation_id, rpc_state.common.sequence_number, - tickets.BackToFrontPacket.Kind.TRANSMISSION_FAILURE, None) + base_interfaces.BackToFrontTicket.Kind.TRANSMISSION_FAILURE, None) rpc_state.common.sequence_number += 1 self._fore_link.accept_back_to_front_ticket(ticket) @@ -189,14 +188,14 @@ class RearLink(ticket_interfaces.RearLink, activated.Activated): """Handle termination of an RPC.""" # TODO(nathaniel): Cover all statuses. if event.status.code is _low.Code.OK: - kind = tickets.BackToFrontPacket.Kind.COMPLETION + kind = base_interfaces.BackToFrontTicket.Kind.COMPLETION elif event.status.code is _low.Code.CANCELLED: - kind = tickets.BackToFrontPacket.Kind.CANCELLATION + kind = base_interfaces.BackToFrontTicket.Kind.CANCELLATION elif event.status.code is _low.Code.EXPIRED: - kind = tickets.BackToFrontPacket.Kind.EXPIRATION + kind = base_interfaces.BackToFrontTicket.Kind.EXPIRATION else: - kind = tickets.BackToFrontPacket.Kind.TRANSMISSION_FAILURE - ticket = tickets.BackToFrontPacket( + kind = base_interfaces.BackToFrontTicket.Kind.TRANSMISSION_FAILURE + ticket = base_interfaces.BackToFrontTicket( operation_id, rpc_state.common.sequence_number, kind, None) rpc_state.common.sequence_number += 1 self._fore_link.accept_back_to_front_ticket(ticket) @@ -317,7 +316,7 @@ class RearLink(ticket_interfaces.RearLink, activated.Activated): rpc_state.active = False def join_fore_link(self, fore_link): - """See ticket_interfaces.RearLink.join_fore_link for specification.""" + """See base_interfaces.RearLink.join_fore_link for specification.""" with self._condition: self._fore_link = null.NULL_FORE_LINK if fore_link is None else fore_link @@ -366,22 +365,22 @@ class RearLink(ticket_interfaces.RearLink, activated.Activated): self._stop() def accept_front_to_back_ticket(self, ticket): - """See ticket_interfaces.RearLink.accept_front_to_back_ticket for spec.""" + """See base_interfaces.RearLink.accept_front_to_back_ticket for spec.""" with self._condition: if self._completion_queue is None: return - if ticket.kind is tickets.FrontToBackPacket.Kind.COMMENCEMENT: + if ticket.kind is base_interfaces.FrontToBackTicket.Kind.COMMENCEMENT: self._commence( ticket.operation_id, ticket.name, ticket.payload, ticket.timeout) - elif ticket.kind is tickets.FrontToBackPacket.Kind.CONTINUATION: + elif ticket.kind is base_interfaces.FrontToBackTicket.Kind.CONTINUATION: self._continue(ticket.operation_id, ticket.payload) - elif ticket.kind is tickets.FrontToBackPacket.Kind.COMPLETION: + elif ticket.kind is base_interfaces.FrontToBackTicket.Kind.COMPLETION: self._complete(ticket.operation_id, ticket.payload) - elif ticket.kind is tickets.FrontToBackPacket.Kind.ENTIRE: + elif ticket.kind is base_interfaces.FrontToBackTicket.Kind.ENTIRE: self._entire( ticket.operation_id, ticket.name, ticket.payload, ticket.timeout) - elif ticket.kind is tickets.FrontToBackPacket.Kind.CANCELLATION: + elif ticket.kind is base_interfaces.FrontToBackTicket.Kind.CANCELLATION: self._cancel(ticket.operation_id) else: # NOTE(nathaniel): All other categories are treated as cancellation. diff --git a/src/python/src/grpc/early_adopter/implementations.py b/src/python/src/grpc/early_adopter/implementations.py index 7c50d7d8b26592e64af16ce43a0ec3c8443aa0e5..cc0b8ec9e82912c8079d6f6b2b0ac9be374485d5 100644 --- a/src/python/src/grpc/early_adopter/implementations.py +++ b/src/python/src/grpc/early_adopter/implementations.py @@ -33,11 +33,11 @@ import threading from grpc._adapter import fore as _fore from grpc._adapter import rear as _rear -from grpc.early_adopter import _face_utilities -from grpc.early_adopter import _reexport -from grpc.early_adopter import interfaces +from grpc.framework.alpha import _face_utilities +from grpc.framework.alpha import _reexport +from grpc.framework.alpha import interfaces +from grpc.framework.base import implementations as _base_implementations from grpc.framework.base import util as _base_utilities -from grpc.framework.base.packets import implementations as _tickets_implementations from grpc.framework.face import implementations as _face_implementations from grpc.framework.foundation import logging_pool @@ -66,7 +66,7 @@ class _Server(interfaces.Server): self._pool = logging_pool.pool(_THREAD_POOL_SIZE) servicer = _face_implementations.servicer( self._pool, self._breakdown.implementations, None) - self._back = _tickets_implementations.back( + self._back = _base_implementations.back_link( servicer, self._pool, self._pool, self._pool, _ONE_DAY_IN_SECONDS, _ONE_DAY_IN_SECONDS) self._fore_link = _fore.ForeLink( @@ -134,7 +134,7 @@ class _Stub(interfaces.Stub): with self._lock: if self._pool is None: self._pool = logging_pool.pool(_THREAD_POOL_SIZE) - self._front = _tickets_implementations.front( + self._front = _base_implementations.front_link( self._pool, self._pool, self._pool) self._rear_link = _rear.RearLink( self._host, self._port, self._pool, diff --git a/src/python/src/grpc/early_adopter/implementations_test.py b/src/python/src/grpc/early_adopter/implementations_test.py index 949d3def3deca92bac7054e474184deb8c79ffe1..ae4adad90f55f8350c7ecc35dc21c31c3eb80742 100644 --- a/src/python/src/grpc/early_adopter/implementations_test.py +++ b/src/python/src/grpc/early_adopter/implementations_test.py @@ -34,7 +34,7 @@ import unittest from grpc.early_adopter import implementations -from grpc.early_adopter import utilities +from grpc.framework.alpha import utilities from grpc._junkdrawer import math_pb2 SERVICE_NAME = 'math.Math' diff --git a/src/python/src/grpc/framework/base/packets/__init__.py b/src/python/src/grpc/framework/alpha/__init__.py similarity index 99% rename from src/python/src/grpc/framework/base/packets/__init__.py rename to src/python/src/grpc/framework/alpha/__init__.py index 708651910607ffb686d781713f6893567821b9fd..b89398809fabc7c2aab0462cc52a91e31b76e9e6 100644 --- a/src/python/src/grpc/framework/base/packets/__init__.py +++ b/src/python/src/grpc/framework/alpha/__init__.py @@ -26,5 +26,3 @@ # THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT # (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE # OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE. - - diff --git a/src/python/src/grpc/early_adopter/_face_utilities.py b/src/python/src/grpc/framework/alpha/_face_utilities.py similarity index 98% rename from src/python/src/grpc/early_adopter/_face_utilities.py rename to src/python/src/grpc/framework/alpha/_face_utilities.py index ce099fc22f0788d0b3c5d54fc5c3657a079d8b4b..fb0cfe426d0614f27f381e5dd2be0fd764e33647 100644 --- a/src/python/src/grpc/early_adopter/_face_utilities.py +++ b/src/python/src/grpc/framework/alpha/_face_utilities.py @@ -34,8 +34,8 @@ import collections from grpc.framework.common import cardinality from grpc.framework.face import interfaces as face_interfaces # pylint: disable=unused-import from grpc.framework.face import utilities as face_utilities -from grpc.early_adopter import _reexport -from grpc.early_adopter import interfaces +from grpc.framework.alpha import _reexport +from grpc.framework.alpha import interfaces def _qualified_name(service_name, method_name): diff --git a/src/python/src/grpc/early_adopter/_reexport.py b/src/python/src/grpc/framework/alpha/_reexport.py similarity index 98% rename from src/python/src/grpc/early_adopter/_reexport.py rename to src/python/src/grpc/framework/alpha/_reexport.py index 49bc38e203ced30492baf3cf662c28617ba4971a..198cb95ad5cb81cb2585b3650a29a32e998d2401 100644 --- a/src/python/src/grpc/early_adopter/_reexport.py +++ b/src/python/src/grpc/framework/alpha/_reexport.py @@ -31,8 +31,8 @@ from grpc.framework.common import cardinality from grpc.framework.face import exceptions as face_exceptions from grpc.framework.face import interfaces as face_interfaces from grpc.framework.foundation import future -from grpc.early_adopter import exceptions -from grpc.early_adopter import interfaces +from grpc.framework.alpha import exceptions +from grpc.framework.alpha import interfaces _EARLY_ADOPTER_CARDINALITY_TO_COMMON_CARDINALITY = { interfaces.Cardinality.UNARY_UNARY: cardinality.Cardinality.UNARY_UNARY, diff --git a/src/python/src/grpc/early_adopter/exceptions.py b/src/python/src/grpc/framework/alpha/exceptions.py similarity index 100% rename from src/python/src/grpc/early_adopter/exceptions.py rename to src/python/src/grpc/framework/alpha/exceptions.py diff --git a/src/python/src/grpc/early_adopter/interfaces.py b/src/python/src/grpc/framework/alpha/interfaces.py similarity index 99% rename from src/python/src/grpc/early_adopter/interfaces.py rename to src/python/src/grpc/framework/alpha/interfaces.py index b733873c1c965b213e56b13745b92f8aca58686d..8380567c97205dfa35676a314e92578e0f3631f4 100644 --- a/src/python/src/grpc/early_adopter/interfaces.py +++ b/src/python/src/grpc/framework/alpha/interfaces.py @@ -33,7 +33,7 @@ import abc import enum # exceptions is referenced from specification in this module. -from grpc.early_adopter import exceptions # pylint: disable=unused-import +from grpc.framework.alpha import exceptions # pylint: disable=unused-import from grpc.framework.foundation import activated from grpc.framework.foundation import future diff --git a/src/python/src/grpc/early_adopter/utilities.py b/src/python/src/grpc/framework/alpha/utilities.py similarity index 99% rename from src/python/src/grpc/early_adopter/utilities.py rename to src/python/src/grpc/framework/alpha/utilities.py index da8ef825aa5d5df2344ac5899b5f0f7787a18c6d..7d7f78f5e44d6bc8a03263b5052ba8da76746ac5 100644 --- a/src/python/src/grpc/early_adopter/utilities.py +++ b/src/python/src/grpc/framework/alpha/utilities.py @@ -29,7 +29,7 @@ """Utilities for use with GRPC.""" -from grpc.early_adopter import interfaces +from grpc.framework.alpha import interfaces class _RpcMethodDescription( diff --git a/src/python/src/grpc/framework/base/packets/_cancellation.py b/src/python/src/grpc/framework/base/_cancellation.py similarity index 89% rename from src/python/src/grpc/framework/base/packets/_cancellation.py rename to src/python/src/grpc/framework/base/_cancellation.py index 4a0ced1440c9e31bd508828e5efff5181b2b939c..ffbc90668fb843d92063b85d12949e9c8779dec2 100644 --- a/src/python/src/grpc/framework/base/packets/_cancellation.py +++ b/src/python/src/grpc/framework/base/_cancellation.py @@ -29,9 +29,8 @@ """State and behavior for operation cancellation.""" -from grpc.framework.base import interfaces as base_interfaces -from grpc.framework.base.packets import _interfaces -from grpc.framework.base.packets import packets +from grpc.framework.base import _interfaces +from grpc.framework.base import interfaces class CancellationManager(_interfaces.CancellationManager): @@ -59,7 +58,7 @@ class CancellationManager(_interfaces.CancellationManager): def cancel(self): """See _interfaces.CancellationManager.cancel for specification.""" with self._lock: - self._termination_manager.abort(base_interfaces.Outcome.CANCELLED) - self._transmission_manager.abort(base_interfaces.Outcome.CANCELLED) + self._termination_manager.abort(interfaces.Outcome.CANCELLED) + self._transmission_manager.abort(interfaces.Outcome.CANCELLED) self._ingestion_manager.abort() self._expiration_manager.abort() diff --git a/src/python/src/grpc/framework/base/packets/_constants.py b/src/python/src/grpc/framework/base/_constants.py similarity index 100% rename from src/python/src/grpc/framework/base/packets/_constants.py rename to src/python/src/grpc/framework/base/_constants.py diff --git a/src/python/src/grpc/framework/base/packets/_context.py b/src/python/src/grpc/framework/base/_context.py similarity index 86% rename from src/python/src/grpc/framework/base/packets/_context.py rename to src/python/src/grpc/framework/base/_context.py index 45241c639e8b96202f8bf589447b87e2e6a8b024..d84871d6399db8d7bb04d8d31457ad0b729052e7 100644 --- a/src/python/src/grpc/framework/base/packets/_context.py +++ b/src/python/src/grpc/framework/base/_context.py @@ -32,12 +32,12 @@ import time # _interfaces is referenced from specification in this module. -from grpc.framework.base import interfaces as base_interfaces -from grpc.framework.base.packets import _interfaces # pylint: disable=unused-import +from grpc.framework.base import interfaces +from grpc.framework.base import _interfaces # pylint: disable=unused-import -class OperationContext(base_interfaces.OperationContext): - """An implementation of base_interfaces.OperationContext.""" +class OperationContext(interfaces.OperationContext): + """An implementation of interfaces.OperationContext.""" def __init__( self, lock, operation_id, local_failure, termination_manager, @@ -47,8 +47,8 @@ class OperationContext(base_interfaces.OperationContext): Args: lock: The operation-wide lock. operation_id: An object identifying the operation. - local_failure: Whichever one of base_interfaces.Outcome.SERVICED_FAILURE - or base_interfaces.Outcome.SERVICER_FAILURE describes local failure of + local_failure: Whichever one of interfaces.Outcome.SERVICED_FAILURE or + interfaces.Outcome.SERVICER_FAILURE describes local failure of customer code. termination_manager: The _interfaces.TerminationManager for the operation. transmission_manager: The _interfaces.TransmissionManager for the @@ -75,12 +75,12 @@ class OperationContext(base_interfaces.OperationContext): self._expiration_manager = expiration_manager def is_active(self): - """See base_interfaces.OperationContext.is_active for specification.""" + """See interfaces.OperationContext.is_active for specification.""" with self._lock: return self._termination_manager.is_active() def add_termination_callback(self, callback): - """See base_interfaces.OperationContext.add_termination_callback.""" + """See interfaces.OperationContext.add_termination_callback.""" with self._lock: self._termination_manager.add_callback(callback) diff --git a/src/python/src/grpc/framework/base/packets/_emission.py b/src/python/src/grpc/framework/base/_emission.py similarity index 89% rename from src/python/src/grpc/framework/base/packets/_emission.py rename to src/python/src/grpc/framework/base/_emission.py index cfc9e40a24263fbd42a3d890ab74ec9b4e082f3d..1829669a72b8e3fa0531efa715669d6fcc89c1ab 100644 --- a/src/python/src/grpc/framework/base/packets/_emission.py +++ b/src/python/src/grpc/framework/base/_emission.py @@ -29,8 +29,8 @@ """State and behavior for handling emitted values.""" -from grpc.framework.base import interfaces as base_interfaces -from grpc.framework.base.packets import _interfaces +from grpc.framework.base import interfaces +from grpc.framework.base import _interfaces class _EmissionManager(_interfaces.EmissionManager): @@ -42,10 +42,9 @@ class _EmissionManager(_interfaces.EmissionManager): Args: lock: The operation-wide lock. - failure_outcome: Whichever one of - base_interfaces.Outcome.SERVICED_FAILURE or - base_interfaces.Outcome.SERVICER_FAILURE describes this object's - methods being called inappropriately by customer code. + failure_outcome: Whichever one of interfaces.Outcome.SERVICED_FAILURE or + interfaces.Outcome.SERVICER_FAILURE describes this object's methods + being called inappropriately by customer code. termination_manager: The _interfaces.TerminationManager for the operation. transmission_manager: The _interfaces.TransmissionManager for the operation. @@ -106,7 +105,7 @@ def front_emission_manager(lock, termination_manager, transmission_manager): An _interfaces.EmissionManager appropriate for front-side use. """ return _EmissionManager( - lock, base_interfaces.Outcome.SERVICED_FAILURE, termination_manager, + lock, interfaces.Outcome.SERVICED_FAILURE, termination_manager, transmission_manager) @@ -122,5 +121,5 @@ def back_emission_manager(lock, termination_manager, transmission_manager): An _interfaces.EmissionManager appropriate for back-side use. """ return _EmissionManager( - lock, base_interfaces.Outcome.SERVICER_FAILURE, termination_manager, + lock, interfaces.Outcome.SERVICER_FAILURE, termination_manager, transmission_manager) diff --git a/src/python/src/grpc/framework/base/packets/_ends.py b/src/python/src/grpc/framework/base/_ends.py similarity index 85% rename from src/python/src/grpc/framework/base/packets/_ends.py rename to src/python/src/grpc/framework/base/_ends.py index 614d1f666e0a68cf86596d1eab3bfd32621d08e2..176f3ac06e3d77e5f15d96258fe0456e1132764b 100644 --- a/src/python/src/grpc/framework/base/packets/_ends.py +++ b/src/python/src/grpc/framework/base/_ends.py @@ -27,32 +27,30 @@ # (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE # OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE. -"""Implementations of Fronts and Backs.""" +"""Implementations of FrontLinks and BackLinks.""" import collections import threading import uuid -# _interfaces and packets are referenced from specification in this module. -from grpc.framework.base import interfaces as base_interfaces -from grpc.framework.base.packets import _cancellation -from grpc.framework.base.packets import _context -from grpc.framework.base.packets import _emission -from grpc.framework.base.packets import _expiration -from grpc.framework.base.packets import _ingestion -from grpc.framework.base.packets import _interfaces # pylint: disable=unused-import -from grpc.framework.base.packets import _reception -from grpc.framework.base.packets import _termination -from grpc.framework.base.packets import _transmission -from grpc.framework.base.packets import interfaces -from grpc.framework.base.packets import packets # pylint: disable=unused-import +# _interfaces is referenced from specification in this module. +from grpc.framework.base import _cancellation +from grpc.framework.base import _context +from grpc.framework.base import _emission +from grpc.framework.base import _expiration +from grpc.framework.base import _ingestion +from grpc.framework.base import _interfaces # pylint: disable=unused-import +from grpc.framework.base import _reception +from grpc.framework.base import _termination +from grpc.framework.base import _transmission +from grpc.framework.base import interfaces from grpc.framework.foundation import callable_util _IDLE_ACTION_EXCEPTION_LOG_MESSAGE = 'Exception calling idle action!' -class _EasyOperation(base_interfaces.Operation): - """A trivial implementation of base_interfaces.Operation.""" +class _EasyOperation(interfaces.Operation): + """A trivial implementation of interfaces.Operation.""" def __init__(self, emission_manager, context, cancellation_manager): """Constructor. @@ -60,7 +58,7 @@ class _EasyOperation(base_interfaces.Operation): Args: emission_manager: The _interfaces.EmissionManager for the operation that will accept values emitted by customer code. - context: The base_interfaces.OperationContext for use by the customer + context: The interfaces.OperationContext for use by the customer during the operation. cancellation_manager: The _interfaces.CancellationManager for the operation. @@ -88,7 +86,7 @@ class _Endlette(object): # indicates an in-progress fire-and-forget operation for which the customer # has chosen to ignore results. self._operations = {} - self._stats = {outcome: 0 for outcome in base_interfaces.Outcome} + self._stats = {outcome: 0 for outcome in interfaces.Outcome} self._idle_actions = [] def terminal_action(self, operation_id): @@ -152,9 +150,9 @@ def _front_operate( """Constructs objects necessary for front-side operation management. Args: - callback: A callable that accepts packets.FrontToBackPackets and delivers - them to the other side of the operation. Execution of this callable may - take any arbitrary length of time. + callback: A callable that accepts interfaces.FrontToBackTickets and + delivers them to the other side of the operation. Execution of this + callable may take any arbitrary length of time. work_pool: A thread pool in which to execute customer code. transmission_pool: A thread pool to use for transmitting to the other side of the operation. @@ -169,7 +167,7 @@ def _front_operate( complete: A boolean indicating whether or not additional payloads will be supplied by the customer. timeout: A length of time in seconds to allow for the operation. - subscription: A base_interfaces.ServicedSubscription describing the + subscription: A interfaces.ServicedSubscription describing the customer's interest in the results of the operation. trace_id: A uuid.UUID identifying a set of related operations to which this operation belongs. May be None. @@ -188,7 +186,7 @@ def _front_operate( lock, transmission_pool, callback, operation_id, name, subscription.kind, trace_id, timeout, termination_manager) operation_context = _context.OperationContext( - lock, operation_id, base_interfaces.Outcome.SERVICED_FAILURE, + lock, operation_id, interfaces.Outcome.SERVICED_FAILURE, termination_manager, transmission_manager) emission_manager = _emission.front_emission_manager( lock, termination_manager, transmission_manager) @@ -216,7 +214,7 @@ def _front_operate( transmission_manager.inmit(payload, complete) - if subscription.kind is base_interfaces.ServicedSubscription.Kind.NONE: + if subscription.kind is interfaces.ServicedSubscription.Kind.NONE: returned_reception_manager = None else: returned_reception_manager = reception_manager @@ -226,8 +224,8 @@ def _front_operate( cancellation_manager) -class Front(interfaces.Front): - """An implementation of interfaces.Front.""" +class FrontLink(interfaces.FrontLink): + """An implementation of interfaces.FrontLink.""" def __init__(self, work_pool, transmission_pool, utility_pool): """Constructor. @@ -252,16 +250,16 @@ class Front(interfaces.Front): self._callback = rear_link.accept_front_to_back_ticket def operation_stats(self): - """See base_interfaces.End.operation_stats for specification.""" + """See interfaces.End.operation_stats for specification.""" return self._endlette.operation_stats() def add_idle_action(self, action): - """See base_interfaces.End.add_idle_action for specification.""" + """See interfaces.End.add_idle_action for specification.""" self._endlette.add_idle_action(action) def operate( self, name, payload, complete, timeout, subscription, trace_id): - """See base_interfaces.Front.operate for specification.""" + """See interfaces.Front.operate for specification.""" operation_id = uuid.uuid4() with self._endlette: management = _front_operate( @@ -278,7 +276,7 @@ class Front(interfaces.Front): with self._endlette: reception_manager = self._endlette.get_operation(ticket.operation_id) if reception_manager: - reception_manager.receive_packet(ticket) + reception_manager.receive_ticket(ticket) def _back_operate( @@ -291,16 +289,16 @@ def _back_operate( Args: servicer: An interfaces.Servicer for servicing operations. - callback: A callable that accepts packets.BackToFrontPackets and delivers - them to the other side of the operation. Execution of this callable may - take any arbitrary length of time. + callback: A callable that accepts interfaces.BackToFrontTickets and + delivers them to the other side of the operation. Execution of this + callable may take any arbitrary length of time. work_pool: A thread pool in which to execute customer code. transmission_pool: A thread pool to use for transmitting to the other side of the operation. utility_pool: A thread pool for utility tasks. termination_action: A no-arg behavior to be called upon operation completion. - ticket: The first packets.FrontToBackPacket received for the operation. + ticket: The first interfaces.FrontToBackTicket received for the operation. default_timeout: A length of time in seconds to be used as the default time alloted for a single operation. maximum_timeout: A length of time in seconds to be used as the maximum @@ -317,7 +315,7 @@ def _back_operate( lock, transmission_pool, callback, ticket.operation_id, termination_manager, ticket.subscription) operation_context = _context.OperationContext( - lock, ticket.operation_id, base_interfaces.Outcome.SERVICER_FAILURE, + lock, ticket.operation_id, interfaces.Outcome.SERVICER_FAILURE, termination_manager, transmission_manager) emission_manager = _emission.back_emission_manager( lock, termination_manager, transmission_manager) @@ -340,13 +338,13 @@ def _back_operate( ingestion_manager, expiration_manager) ingestion_manager.set_expiration_manager(expiration_manager) - reception_manager.receive_packet(ticket) + reception_manager.receive_ticket(ticket) return reception_manager -class Back(interfaces.Back): - """An implementation of interfaces.Back.""" +class BackLink(interfaces.BackLink): + """An implementation of interfaces.BackLink.""" def __init__( self, servicer, work_pool, transmission_pool, utility_pool, @@ -390,12 +388,12 @@ class Back(interfaces.Back): self._default_timeout, self._maximum_timeout) self._endlette.add_operation(ticket.operation_id, reception_manager) else: - reception_manager.receive_packet(ticket) + reception_manager.receive_ticket(ticket) def operation_stats(self): - """See base_interfaces.End.operation_stats for specification.""" + """See interfaces.End.operation_stats for specification.""" return self._endlette.operation_stats() def add_idle_action(self, action): - """See base_interfaces.End.add_idle_action for specification.""" + """See interfaces.End.add_idle_action for specification.""" self._endlette.add_idle_action(action) diff --git a/src/python/src/grpc/framework/base/packets/_expiration.py b/src/python/src/grpc/framework/base/_expiration.py similarity index 95% rename from src/python/src/grpc/framework/base/packets/_expiration.py rename to src/python/src/grpc/framework/base/_expiration.py index a9ecaeaa63c8335366458b9ee579877398d5fa2e..17acbef4c111489e5f8973e32911a07bc25624ba 100644 --- a/src/python/src/grpc/framework/base/packets/_expiration.py +++ b/src/python/src/grpc/framework/base/_expiration.py @@ -31,8 +31,8 @@ import time -from grpc.framework.base import interfaces as base_interfaces -from grpc.framework.base.packets import _interfaces +from grpc.framework.base import _interfaces +from grpc.framework.base import interfaces from grpc.framework.foundation import later @@ -73,8 +73,8 @@ class _ExpirationManager(_interfaces.ExpirationManager): with self._lock: if self._future is not None and index == self._index: self._future = None - self._termination_manager.abort(base_interfaces.Outcome.EXPIRED) - self._transmission_manager.abort(base_interfaces.Outcome.EXPIRED) + self._termination_manager.abort(interfaces.Outcome.EXPIRED) + self._transmission_manager.abort(interfaces.Outcome.EXPIRED) self._ingestion_manager.abort() def start(self): diff --git a/src/python/src/grpc/framework/base/packets/_ingestion.py b/src/python/src/grpc/framework/base/_ingestion.py similarity index 99% rename from src/python/src/grpc/framework/base/packets/_ingestion.py rename to src/python/src/grpc/framework/base/_ingestion.py index c5c08fd98e4c5347151748d1b93b4304b54e6762..06d5b92f0b3fbcbd3ee4afd810a58afad22d4595 100644 --- a/src/python/src/grpc/framework/base/packets/_ingestion.py +++ b/src/python/src/grpc/framework/base/_ingestion.py @@ -32,11 +32,10 @@ import abc import collections +from grpc.framework.base import _constants +from grpc.framework.base import _interfaces from grpc.framework.base import exceptions from grpc.framework.base import interfaces -from grpc.framework.base.packets import _constants -from grpc.framework.base.packets import _interfaces -from grpc.framework.base.packets import packets from grpc.framework.foundation import abandonment from grpc.framework.foundation import callable_util from grpc.framework.foundation import stream diff --git a/src/python/src/grpc/framework/base/packets/_interfaces.py b/src/python/src/grpc/framework/base/_interfaces.py similarity index 91% rename from src/python/src/grpc/framework/base/packets/_interfaces.py rename to src/python/src/grpc/framework/base/_interfaces.py index 64184bdf7c93e554a4abab9501588ba072d0aa4e..d88cf76590e3039a4b22ff7efb3f0a5674b50a35 100644 --- a/src/python/src/grpc/framework/base/packets/_interfaces.py +++ b/src/python/src/grpc/framework/base/_interfaces.py @@ -31,9 +31,8 @@ import abc -# base_interfaces and packets are referenced from specification in this module. -from grpc.framework.base import interfaces as base_interfaces # pylint: disable=unused-import -from grpc.framework.base.packets import packets # pylint: disable=unused-import +# interfaces is referenced from specification in this module. +from grpc.framework.base import interfaces # pylint: disable=unused-import from grpc.framework.foundation import stream @@ -63,7 +62,7 @@ class TerminationManager(object): immediately. Args: - callback: A callable that will be passed a base_interfaces.Outcome value. + callback: A callable that will be passed an interfaces.Outcome value. """ raise NotImplementedError() @@ -87,7 +86,7 @@ class TerminationManager(object): """Indicates that the operation must abort for the indicated reason. Args: - outcome: A base_interfaces.Outcome indicating operation abortion. + outcome: An interfaces.Outcome indicating operation abortion. """ raise NotImplementedError() @@ -113,7 +112,7 @@ class TransmissionManager(object): """Indicates that the operation has aborted for the indicated reason. Args: - outcome: A base_interfaces.Outcome indicating operation abortion. + outcome: An interfaces.Outcome indicating operation abortion. """ raise NotImplementedError() @@ -248,15 +247,15 @@ class ExpirationManager(object): class ReceptionManager(object): - """A manager responsible for receiving packets from the other end.""" + """A manager responsible for receiving tickets from the other end.""" __metaclass__ = abc.ABCMeta @abc.abstractmethod - def receive_packet(self, packet): - """Handle a packet from the other side of the operation. + def receive_ticket(self, ticket): + """Handle a ticket from the other side of the operation. Args: - packet: A packets.BackToFrontPacket or packets.FrontToBackPacket + ticket: An interfaces.BackToFrontTicket or interfaces.FrontToBackTicket appropriate to this end of the operation and this object. """ raise NotImplementedError() diff --git a/src/python/src/grpc/framework/base/packets/_reception.py b/src/python/src/grpc/framework/base/_reception.py similarity index 57% rename from src/python/src/grpc/framework/base/packets/_reception.py rename to src/python/src/grpc/framework/base/_reception.py index ef10c7f8fe514feed7c6045a3982765a3bc06c39..dd428964f1558e1db8e443cc7ab371946c4c718d 100644 --- a/src/python/src/grpc/framework/base/packets/_reception.py +++ b/src/python/src/grpc/framework/base/_reception.py @@ -27,47 +27,46 @@ # (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE # OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE. -"""State and behavior for packet reception.""" +"""State and behavior for ticket reception.""" import abc -from grpc.framework.base import interfaces as base_interfaces -from grpc.framework.base.packets import _interfaces -from grpc.framework.base.packets import packets +from grpc.framework.base import interfaces +from grpc.framework.base import _interfaces -_INITIAL_FRONT_TO_BACK_PACKET_KINDS = ( - packets.FrontToBackPacket.Kind.COMMENCEMENT, - packets.FrontToBackPacket.Kind.ENTIRE, +_INITIAL_FRONT_TO_BACK_TICKET_KINDS = ( + interfaces.FrontToBackTicket.Kind.COMMENCEMENT, + interfaces.FrontToBackTicket.Kind.ENTIRE, ) class _Receiver(object): - """Common specification of different packet-handling behavior.""" + """Common specification of different ticket-handling behavior.""" __metaclass__ = abc.ABCMeta @abc.abstractmethod - def abort_if_abortive(self, packet): - """Aborts the operation if the packet is abortive. + def abort_if_abortive(self, ticket): + """Aborts the operation if the ticket is abortive. Args: - packet: A just-arrived packet. + ticket: A just-arrived ticket. Returns: A boolean indicating whether or not this Receiver aborted the operation - based on the packet. + based on the ticket. """ raise NotImplementedError() @abc.abstractmethod - def receive(self, packet): - """Handles a just-arrived packet. + def receive(self, ticket): + """Handles a just-arrived ticket. Args: - packet: A just-arrived packet. + ticket: A just-arrived ticket. Returns: - A boolean indicating whether or not the packet was terminal (i.e. whether - or not non-abortive packets are legal after this one). + A boolean indicating whether or not the ticket was terminal (i.e. whether + or not non-abortive tickets are legal after this one). """ raise NotImplementedError() @@ -88,15 +87,15 @@ def _abort( def _abort_if_abortive( - packet, abortive, termination_manager, transmission_manager, + ticket, abortive, termination_manager, transmission_manager, ingestion_manager, expiration_manager): - """Determines a packet's being abortive and if so aborts the operation. + """Determines a ticket's being abortive and if so aborts the operation. Args: - packet: A just-arrived packet. - abortive: A callable that takes a packet and returns a - base_interfaces.Outcome indicating that the operation should be aborted - or None indicating that the operation should not be aborted. + ticket: A just-arrived ticket. + abortive: A callable that takes a ticket and returns an interfaces.Outcome + indicating that the operation should be aborted or None indicating that + the operation should not be aborted. termination_manager: The operation's _interfaces.TerminationManager. transmission_manager: The operation's _interfaces.TransmissionManager. ingestion_manager: The operation's _interfaces.IngestionManager. @@ -105,7 +104,7 @@ def _abort_if_abortive( Returns: True if the operation was aborted; False otherwise. """ - abortion_outcome = abortive(packet) + abortion_outcome = abortive(ticket) if abortion_outcome is None: return False else: @@ -120,12 +119,12 @@ def _reception_failure( expiration_manager): """Aborts the operation with an indication of reception failure.""" _abort( - base_interfaces.Outcome.RECEPTION_FAILURE, termination_manager, + interfaces.Outcome.RECEPTION_FAILURE, termination_manager, transmission_manager, ingestion_manager, expiration_manager) class _BackReceiver(_Receiver): - """Packet-handling specific to the back side of an operation.""" + """Ticket-handling specific to the back side of an operation.""" def __init__( self, termination_manager, transmission_manager, ingestion_manager, @@ -143,68 +142,68 @@ class _BackReceiver(_Receiver): self._ingestion_manager = ingestion_manager self._expiration_manager = expiration_manager - self._first_packet_seen = False - self._last_packet_seen = False + self._first_ticket_seen = False + self._last_ticket_seen = False - def _abortive(self, packet): - """Determines whether or not (and if so, how) a packet is abortive. + def _abortive(self, ticket): + """Determines whether or not (and if so, how) a ticket is abortive. Args: - packet: A just-arrived packet. + ticket: A just-arrived ticket. Returns: - A base_interfaces.Outcome value describing operation abortion if the - packet is abortive or None if the packet is not abortive. + An interfaces.Outcome value describing operation abortion if the + ticket is abortive or None if the ticket is not abortive. """ - if packet.kind is packets.FrontToBackPacket.Kind.CANCELLATION: - return base_interfaces.Outcome.CANCELLED - elif packet.kind is packets.FrontToBackPacket.Kind.EXPIRATION: - return base_interfaces.Outcome.EXPIRED - elif packet.kind is packets.FrontToBackPacket.Kind.SERVICED_FAILURE: - return base_interfaces.Outcome.SERVICED_FAILURE - elif packet.kind is packets.FrontToBackPacket.Kind.RECEPTION_FAILURE: - return base_interfaces.Outcome.SERVICED_FAILURE - elif (packet.kind in _INITIAL_FRONT_TO_BACK_PACKET_KINDS and - self._first_packet_seen): - return base_interfaces.Outcome.RECEPTION_FAILURE - elif self._last_packet_seen: - return base_interfaces.Outcome.RECEPTION_FAILURE + if ticket.kind is interfaces.FrontToBackTicket.Kind.CANCELLATION: + return interfaces.Outcome.CANCELLED + elif ticket.kind is interfaces.FrontToBackTicket.Kind.EXPIRATION: + return interfaces.Outcome.EXPIRED + elif ticket.kind is interfaces.FrontToBackTicket.Kind.SERVICED_FAILURE: + return interfaces.Outcome.SERVICED_FAILURE + elif ticket.kind is interfaces.FrontToBackTicket.Kind.RECEPTION_FAILURE: + return interfaces.Outcome.SERVICED_FAILURE + elif (ticket.kind in _INITIAL_FRONT_TO_BACK_TICKET_KINDS and + self._first_ticket_seen): + return interfaces.Outcome.RECEPTION_FAILURE + elif self._last_ticket_seen: + return interfaces.Outcome.RECEPTION_FAILURE else: return None - def abort_if_abortive(self, packet): + def abort_if_abortive(self, ticket): """See _Receiver.abort_if_abortive for specification.""" return _abort_if_abortive( - packet, self._abortive, self._termination_manager, + ticket, self._abortive, self._termination_manager, self._transmission_manager, self._ingestion_manager, self._expiration_manager) - def receive(self, packet): + def receive(self, ticket): """See _Receiver.receive for specification.""" - if packet.timeout is not None: - self._expiration_manager.change_timeout(packet.timeout) - - if packet.kind is packets.FrontToBackPacket.Kind.COMMENCEMENT: - self._first_packet_seen = True - self._ingestion_manager.start(packet.name) - if packet.payload is not None: - self._ingestion_manager.consume(packet.payload) - elif packet.kind is packets.FrontToBackPacket.Kind.CONTINUATION: - self._ingestion_manager.consume(packet.payload) - elif packet.kind is packets.FrontToBackPacket.Kind.COMPLETION: - self._last_packet_seen = True - if packet.payload is None: + if ticket.timeout is not None: + self._expiration_manager.change_timeout(ticket.timeout) + + if ticket.kind is interfaces.FrontToBackTicket.Kind.COMMENCEMENT: + self._first_ticket_seen = True + self._ingestion_manager.start(ticket.name) + if ticket.payload is not None: + self._ingestion_manager.consume(ticket.payload) + elif ticket.kind is interfaces.FrontToBackTicket.Kind.CONTINUATION: + self._ingestion_manager.consume(ticket.payload) + elif ticket.kind is interfaces.FrontToBackTicket.Kind.COMPLETION: + self._last_ticket_seen = True + if ticket.payload is None: self._ingestion_manager.terminate() else: - self._ingestion_manager.consume_and_terminate(packet.payload) + self._ingestion_manager.consume_and_terminate(ticket.payload) else: - self._first_packet_seen = True - self._last_packet_seen = True - self._ingestion_manager.start(packet.name) - if packet.payload is None: + self._first_ticket_seen = True + self._last_ticket_seen = True + self._ingestion_manager.start(ticket.name) + if ticket.payload is None: self._ingestion_manager.terminate() else: - self._ingestion_manager.consume_and_terminate(packet.payload) + self._ingestion_manager.consume_and_terminate(ticket.payload) def reception_failure(self): """See _Receiver.reception_failure for specification.""" @@ -214,7 +213,7 @@ class _BackReceiver(_Receiver): class _FrontReceiver(_Receiver): - """Packet-handling specific to the front side of an operation.""" + """Ticket-handling specific to the front side of an operation.""" def __init__( self, termination_manager, transmission_manager, ingestion_manager, @@ -232,48 +231,48 @@ class _FrontReceiver(_Receiver): self._ingestion_manager = ingestion_manager self._expiration_manager = expiration_manager - self._last_packet_seen = False + self._last_ticket_seen = False - def _abortive(self, packet): - """Determines whether or not (and if so, how) a packet is abortive. + def _abortive(self, ticket): + """Determines whether or not (and if so, how) a ticket is abortive. Args: - packet: A just-arrived packet. + ticket: A just-arrived ticket. Returns: - A base_interfaces.Outcome value describing operation abortion if the - packet is abortive or None if the packet is not abortive. + An interfaces.Outcome value describing operation abortion if the ticket + is abortive or None if the ticket is not abortive. """ - if packet.kind is packets.BackToFrontPacket.Kind.CANCELLATION: - return base_interfaces.Outcome.CANCELLED - elif packet.kind is packets.BackToFrontPacket.Kind.EXPIRATION: - return base_interfaces.Outcome.EXPIRED - elif packet.kind is packets.BackToFrontPacket.Kind.SERVICER_FAILURE: - return base_interfaces.Outcome.SERVICER_FAILURE - elif packet.kind is packets.BackToFrontPacket.Kind.RECEPTION_FAILURE: - return base_interfaces.Outcome.SERVICER_FAILURE - elif self._last_packet_seen: - return base_interfaces.Outcome.RECEPTION_FAILURE + if ticket.kind is interfaces.BackToFrontTicket.Kind.CANCELLATION: + return interfaces.Outcome.CANCELLED + elif ticket.kind is interfaces.BackToFrontTicket.Kind.EXPIRATION: + return interfaces.Outcome.EXPIRED + elif ticket.kind is interfaces.BackToFrontTicket.Kind.SERVICER_FAILURE: + return interfaces.Outcome.SERVICER_FAILURE + elif ticket.kind is interfaces.BackToFrontTicket.Kind.RECEPTION_FAILURE: + return interfaces.Outcome.SERVICER_FAILURE + elif self._last_ticket_seen: + return interfaces.Outcome.RECEPTION_FAILURE else: return None - def abort_if_abortive(self, packet): + def abort_if_abortive(self, ticket): """See _Receiver.abort_if_abortive for specification.""" return _abort_if_abortive( - packet, self._abortive, self._termination_manager, + ticket, self._abortive, self._termination_manager, self._transmission_manager, self._ingestion_manager, self._expiration_manager) - def receive(self, packet): + def receive(self, ticket): """See _Receiver.receive for specification.""" - if packet.kind is packets.BackToFrontPacket.Kind.CONTINUATION: - self._ingestion_manager.consume(packet.payload) - elif packet.kind is packets.BackToFrontPacket.Kind.COMPLETION: - self._last_packet_seen = True - if packet.payload is None: + if ticket.kind is interfaces.BackToFrontTicket.Kind.CONTINUATION: + self._ingestion_manager.consume(ticket.payload) + elif ticket.kind is interfaces.BackToFrontTicket.Kind.COMPLETION: + self._last_ticket_seen = True + if ticket.payload is None: self._ingestion_manager.terminate() else: - self._ingestion_manager.consume_and_terminate(packet.payload) + self._ingestion_manager.consume_and_terminate(ticket.payload) def reception_failure(self): """See _Receiver.reception_failure for specification.""" @@ -290,72 +289,72 @@ class _ReceptionManager(_interfaces.ReceptionManager): Args: lock: The operation-servicing-wide lock object. - receiver: A _Receiver responsible for handling received packets. + receiver: A _Receiver responsible for handling received tickets. """ self._lock = lock self._receiver = receiver self._lowest_unseen_sequence_number = 0 - self._out_of_sequence_packets = {} + self._out_of_sequence_tickets = {} self._completed_sequence_number = None self._aborted = False - def _sequence_failure(self, packet): - """Determines a just-arrived packet's sequential legitimacy. + def _sequence_failure(self, ticket): + """Determines a just-arrived ticket's sequential legitimacy. Args: - packet: A just-arrived packet. + ticket: A just-arrived ticket. Returns: - True if the packet is sequentially legitimate; False otherwise. + True if the ticket is sequentially legitimate; False otherwise. """ - if packet.sequence_number < self._lowest_unseen_sequence_number: + if ticket.sequence_number < self._lowest_unseen_sequence_number: return True - elif packet.sequence_number in self._out_of_sequence_packets: + elif ticket.sequence_number in self._out_of_sequence_tickets: return True elif (self._completed_sequence_number is not None and - self._completed_sequence_number <= packet.sequence_number): + self._completed_sequence_number <= ticket.sequence_number): return True else: return False - def _process(self, packet): - """Process those packets ready to be processed. + def _process(self, ticket): + """Process those tickets ready to be processed. Args: - packet: A just-arrived packet the sequence number of which matches this + ticket: A just-arrived ticket the sequence number of which matches this _ReceptionManager's _lowest_unseen_sequence_number field. """ while True: - completed = self._receiver.receive(packet) + completed = self._receiver.receive(ticket) if completed: - self._out_of_sequence_packets.clear() - self._completed_sequence_number = packet.sequence_number - self._lowest_unseen_sequence_number = packet.sequence_number + 1 + self._out_of_sequence_tickets.clear() + self._completed_sequence_number = ticket.sequence_number + self._lowest_unseen_sequence_number = ticket.sequence_number + 1 return else: - next_packet = self._out_of_sequence_packets.pop( - packet.sequence_number + 1, None) - if next_packet is None: - self._lowest_unseen_sequence_number = packet.sequence_number + 1 + next_ticket = self._out_of_sequence_tickets.pop( + ticket.sequence_number + 1, None) + if next_ticket is None: + self._lowest_unseen_sequence_number = ticket.sequence_number + 1 return else: - packet = next_packet + ticket = next_ticket - def receive_packet(self, packet): - """See _interfaces.ReceptionManager.receive_packet for specification.""" + def receive_ticket(self, ticket): + """See _interfaces.ReceptionManager.receive_ticket for specification.""" with self._lock: if self._aborted: return - elif self._sequence_failure(packet): + elif self._sequence_failure(ticket): self._receiver.reception_failure() self._aborted = True - elif self._receiver.abort_if_abortive(packet): + elif self._receiver.abort_if_abortive(ticket): self._aborted = True - elif packet.sequence_number == self._lowest_unseen_sequence_number: - self._process(packet) + elif ticket.sequence_number == self._lowest_unseen_sequence_number: + self._process(ticket) else: - self._out_of_sequence_packets[packet.sequence_number] = packet + self._out_of_sequence_tickets[ticket.sequence_number] = ticket def front_reception_manager( diff --git a/src/python/src/grpc/framework/base/packets/_termination.py b/src/python/src/grpc/framework/base/_termination.py similarity index 98% rename from src/python/src/grpc/framework/base/packets/_termination.py rename to src/python/src/grpc/framework/base/_termination.py index 6afba88fc41a43906e93af19eb60f59e51451863..ddcbc60293d010c82dda7266cd25edc47590123e 100644 --- a/src/python/src/grpc/framework/base/packets/_termination.py +++ b/src/python/src/grpc/framework/base/_termination.py @@ -31,9 +31,9 @@ import enum +from grpc.framework.base import _constants +from grpc.framework.base import _interfaces from grpc.framework.base import interfaces -from grpc.framework.base.packets import _constants -from grpc.framework.base.packets import _interfaces from grpc.framework.foundation import callable_util _CALLBACK_EXCEPTION_LOG_MESSAGE = 'Exception calling termination callback!' diff --git a/src/python/src/grpc/framework/base/packets/_transmission.py b/src/python/src/grpc/framework/base/_transmission.py similarity index 68% rename from src/python/src/grpc/framework/base/packets/_transmission.py rename to src/python/src/grpc/framework/base/_transmission.py index 1b18204ec5187ddeb102d037be905d61b3312f1a..684512923443a8ca4e85ec5989762f77180cdd04 100644 --- a/src/python/src/grpc/framework/base/packets/_transmission.py +++ b/src/python/src/grpc/framework/base/_transmission.py @@ -27,14 +27,13 @@ # (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE # OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE. -"""State and behavior for packet transmission during an operation.""" +"""State and behavior for ticket transmission during an operation.""" import abc +from grpc.framework.base import _constants +from grpc.framework.base import _interfaces from grpc.framework.base import interfaces -from grpc.framework.base.packets import _constants -from grpc.framework.base.packets import _interfaces -from grpc.framework.base.packets import packets from grpc.framework.foundation import callable_util _TRANSMISSION_EXCEPTION_LOG_MESSAGE = 'Exception during transmission!' @@ -47,53 +46,53 @@ _BACK_TO_FRONT_NO_TRANSMISSION_OUTCOMES = ( interfaces.Outcome.SERVICED_FAILURE, ) -_ABORTION_OUTCOME_TO_FRONT_TO_BACK_PACKET_KIND = { +_ABORTION_OUTCOME_TO_FRONT_TO_BACK_TICKET_KIND = { interfaces.Outcome.CANCELLED: - packets.FrontToBackPacket.Kind.CANCELLATION, + interfaces.FrontToBackTicket.Kind.CANCELLATION, interfaces.Outcome.EXPIRED: - packets.FrontToBackPacket.Kind.EXPIRATION, + interfaces.FrontToBackTicket.Kind.EXPIRATION, interfaces.Outcome.RECEPTION_FAILURE: - packets.FrontToBackPacket.Kind.RECEPTION_FAILURE, + interfaces.FrontToBackTicket.Kind.RECEPTION_FAILURE, interfaces.Outcome.TRANSMISSION_FAILURE: - packets.FrontToBackPacket.Kind.TRANSMISSION_FAILURE, + interfaces.FrontToBackTicket.Kind.TRANSMISSION_FAILURE, interfaces.Outcome.SERVICED_FAILURE: - packets.FrontToBackPacket.Kind.SERVICED_FAILURE, + interfaces.FrontToBackTicket.Kind.SERVICED_FAILURE, interfaces.Outcome.SERVICER_FAILURE: - packets.FrontToBackPacket.Kind.SERVICER_FAILURE, + interfaces.FrontToBackTicket.Kind.SERVICER_FAILURE, } -_ABORTION_OUTCOME_TO_BACK_TO_FRONT_PACKET_KIND = { +_ABORTION_OUTCOME_TO_BACK_TO_FRONT_TICKET_KIND = { interfaces.Outcome.CANCELLED: - packets.BackToFrontPacket.Kind.CANCELLATION, + interfaces.BackToFrontTicket.Kind.CANCELLATION, interfaces.Outcome.EXPIRED: - packets.BackToFrontPacket.Kind.EXPIRATION, + interfaces.BackToFrontTicket.Kind.EXPIRATION, interfaces.Outcome.RECEPTION_FAILURE: - packets.BackToFrontPacket.Kind.RECEPTION_FAILURE, + interfaces.BackToFrontTicket.Kind.RECEPTION_FAILURE, interfaces.Outcome.TRANSMISSION_FAILURE: - packets.BackToFrontPacket.Kind.TRANSMISSION_FAILURE, + interfaces.BackToFrontTicket.Kind.TRANSMISSION_FAILURE, interfaces.Outcome.SERVICED_FAILURE: - packets.BackToFrontPacket.Kind.SERVICED_FAILURE, + interfaces.BackToFrontTicket.Kind.SERVICED_FAILURE, interfaces.Outcome.SERVICER_FAILURE: - packets.BackToFrontPacket.Kind.SERVICER_FAILURE, + interfaces.BackToFrontTicket.Kind.SERVICER_FAILURE, } -class _Packetizer(object): - """Common specification of different packet-creating behavior.""" +class _Ticketizer(object): + """Common specification of different ticket-creating behavior.""" __metaclass__ = abc.ABCMeta @abc.abstractmethod - def packetize(self, operation_id, sequence_number, payload, complete): - """Creates a packet indicating ordinary operation progress. + def ticketize(self, operation_id, sequence_number, payload, complete): + """Creates a ticket indicating ordinary operation progress. Args: operation_id: The operation ID for the current operation. - sequence_number: A sequence number for the packet. + sequence_number: A sequence number for the ticket. payload: A customer payload object. May be None if sequence_number is zero or complete is true. - complete: A boolean indicating whether or not the packet should describe + complete: A boolean indicating whether or not the ticket should describe itself as (but for a later indication of operation abortion) the last - packet to be sent. + ticket to be sent. Returns: An object of an appropriate type suitable for transmission to the other @@ -102,12 +101,12 @@ class _Packetizer(object): raise NotImplementedError() @abc.abstractmethod - def packetize_abortion(self, operation_id, sequence_number, outcome): - """Creates a packet indicating that the operation is aborted. + def ticketize_abortion(self, operation_id, sequence_number, outcome): + """Creates a ticket indicating that the operation is aborted. Args: operation_id: The operation ID for the current operation. - sequence_number: A sequence number for the packet. + sequence_number: A sequence number for the ticket. outcome: An interfaces.Outcome value describing the operation abortion. Returns: @@ -118,8 +117,8 @@ class _Packetizer(object): raise NotImplementedError() -class _FrontPacketizer(_Packetizer): - """Front-side packet-creating behavior.""" +class _FrontTicketizer(_Ticketizer): + """Front-side ticket-creating behavior.""" def __init__(self, name, subscription_kind, trace_id, timeout): """Constructor. @@ -127,7 +126,7 @@ class _FrontPacketizer(_Packetizer): Args: name: The name of the operation. subscription_kind: An interfaces.ServicedSubscription.Kind value - describing the interest the front has in packets sent from the back. + describing the interest the front has in tickets sent from the back. trace_id: A uuid.UUID identifying a set of related operations to which this operation belongs. timeout: A length of time in seconds to allow for the entire operation. @@ -137,54 +136,54 @@ class _FrontPacketizer(_Packetizer): self._trace_id = trace_id self._timeout = timeout - def packetize(self, operation_id, sequence_number, payload, complete): - """See _Packetizer.packetize for specification.""" + def ticketize(self, operation_id, sequence_number, payload, complete): + """See _Ticketizer.ticketize for specification.""" if sequence_number: if complete: - kind = packets.FrontToBackPacket.Kind.COMPLETION + kind = interfaces.FrontToBackTicket.Kind.COMPLETION else: - kind = packets.FrontToBackPacket.Kind.CONTINUATION - return packets.FrontToBackPacket( + kind = interfaces.FrontToBackTicket.Kind.CONTINUATION + return interfaces.FrontToBackTicket( operation_id, sequence_number, kind, self._name, self._subscription_kind, self._trace_id, payload, self._timeout) else: if complete: - kind = packets.FrontToBackPacket.Kind.ENTIRE + kind = interfaces.FrontToBackTicket.Kind.ENTIRE else: - kind = packets.FrontToBackPacket.Kind.COMMENCEMENT - return packets.FrontToBackPacket( + kind = interfaces.FrontToBackTicket.Kind.COMMENCEMENT + return interfaces.FrontToBackTicket( operation_id, 0, kind, self._name, self._subscription_kind, self._trace_id, payload, self._timeout) - def packetize_abortion(self, operation_id, sequence_number, outcome): - """See _Packetizer.packetize_abortion for specification.""" + def ticketize_abortion(self, operation_id, sequence_number, outcome): + """See _Ticketizer.ticketize_abortion for specification.""" if outcome in _FRONT_TO_BACK_NO_TRANSMISSION_OUTCOMES: return None else: - kind = _ABORTION_OUTCOME_TO_FRONT_TO_BACK_PACKET_KIND[outcome] - return packets.FrontToBackPacket( + kind = _ABORTION_OUTCOME_TO_FRONT_TO_BACK_TICKET_KIND[outcome] + return interfaces.FrontToBackTicket( operation_id, sequence_number, kind, None, None, None, None, None) -class _BackPacketizer(_Packetizer): - """Back-side packet-creating behavior.""" +class _BackTicketizer(_Ticketizer): + """Back-side ticket-creating behavior.""" - def packetize(self, operation_id, sequence_number, payload, complete): - """See _Packetizer.packetize for specification.""" + def ticketize(self, operation_id, sequence_number, payload, complete): + """See _Ticketizer.ticketize for specification.""" if complete: - kind = packets.BackToFrontPacket.Kind.COMPLETION + kind = interfaces.BackToFrontTicket.Kind.COMPLETION else: - kind = packets.BackToFrontPacket.Kind.CONTINUATION - return packets.BackToFrontPacket( + kind = interfaces.BackToFrontTicket.Kind.CONTINUATION + return interfaces.BackToFrontTicket( operation_id, sequence_number, kind, payload) - def packetize_abortion(self, operation_id, sequence_number, outcome): - """See _Packetizer.packetize_abortion for specification.""" + def ticketize_abortion(self, operation_id, sequence_number, outcome): + """See _Ticketizer.ticketize_abortion for specification.""" if outcome in _BACK_TO_FRONT_NO_TRANSMISSION_OUTCOMES: return None else: - kind = _ABORTION_OUTCOME_TO_BACK_TO_FRONT_PACKET_KIND[outcome] - return packets.BackToFrontPacket( + kind = _ABORTION_OUTCOME_TO_BACK_TO_FRONT_TICKET_KIND[outcome] + return interfaces.BackToFrontTicket( operation_id, sequence_number, kind, None) @@ -221,21 +220,21 @@ class _EmptyTransmissionManager(TransmissionManager): class _TransmittingTransmissionManager(TransmissionManager): - """A TransmissionManager implementation that sends packets.""" + """A TransmissionManager implementation that sends tickets.""" def __init__( - self, lock, pool, callback, operation_id, packetizer, + self, lock, pool, callback, operation_id, ticketizer, termination_manager): """Constructor. Args: lock: The operation-servicing-wide lock object. - pool: A thread pool in which the work of transmitting packets will be + pool: A thread pool in which the work of transmitting tickets will be performed. - callback: A callable that accepts packets and sends them to the other side + callback: A callable that accepts tickets and sends them to the other side of the operation. operation_id: The operation's ID. - packetizer: A _Packetizer for packet creation. + ticketizer: A _Ticketizer for ticket creation. termination_manager: The _interfaces.TerminationManager associated with this operation. """ @@ -243,7 +242,7 @@ class _TransmittingTransmissionManager(TransmissionManager): self._pool = pool self._callback = callback self._operation_id = operation_id - self._packetizer = packetizer + self._ticketizer = ticketizer self._termination_manager = termination_manager self._ingestion_manager = None self._expiration_manager = None @@ -260,8 +259,8 @@ class _TransmittingTransmissionManager(TransmissionManager): self._ingestion_manager = ingestion_manager self._expiration_manager = expiration_manager - def _lead_packet(self, emission, complete): - """Creates a packet suitable for leading off the transmission loop. + def _lead_ticket(self, emission, complete): + """Creates a ticket suitable for leading off the transmission loop. Args: emission: A customer payload object to be sent to the other side of the @@ -270,37 +269,37 @@ class _TransmittingTransmissionManager(TransmissionManager): the passed object. Returns: - A packet with which to lead off the transmission loop. + A ticket with which to lead off the transmission loop. """ sequence_number = self._lowest_unused_sequence_number self._lowest_unused_sequence_number += 1 - return self._packetizer.packetize( + return self._ticketizer.ticketize( self._operation_id, sequence_number, emission, complete) - def _abortive_response_packet(self, outcome): - """Creates a packet indicating operation abortion. + def _abortive_response_ticket(self, outcome): + """Creates a ticket indicating operation abortion. Args: outcome: An interfaces.Outcome value describing operation abortion. Returns: - A packet indicating operation abortion. + A ticket indicating operation abortion. """ - packet = self._packetizer.packetize_abortion( + ticket = self._ticketizer.ticketize_abortion( self._operation_id, self._lowest_unused_sequence_number, outcome) - if packet is None: + if ticket is None: return None else: self._lowest_unused_sequence_number += 1 - return packet + return ticket - def _next_packet(self): - """Creates the next packet to be sent to the other side of the operation. + def _next_ticket(self): + """Creates the next ticket to be sent to the other side of the operation. Returns: - A (completed, packet) tuple comprised of a boolean indicating whether or - not the sequence of packets has completed normally and a packet to send - to the other side if the sequence of packets hasn't completed. The tuple + A (completed, ticket) tuple comprised of a boolean indicating whether or + not the sequence of tickets has completed normally and a ticket to send + to the other side if the sequence of tickets hasn't completed. The tuple will never have both a True first element and a non-None second element. """ if self._emissions is None: @@ -311,29 +310,29 @@ class _TransmittingTransmissionManager(TransmissionManager): complete = self._emission_complete and not self._emissions sequence_number = self._lowest_unused_sequence_number self._lowest_unused_sequence_number += 1 - return complete, self._packetizer.packetize( + return complete, self._ticketizer.ticketize( self._operation_id, sequence_number, payload, complete) else: return self._emission_complete, None else: - packet = self._abortive_response_packet(self._outcome) + ticket = self._abortive_response_ticket(self._outcome) self._emissions = None - return False, None if packet is None else packet + return False, None if ticket is None else ticket - def _transmit(self, packet): - """Commences the transmission loop sending packets. + def _transmit(self, ticket): + """Commences the transmission loop sending tickets. Args: - packet: A packet to be sent to the other side of the operation. + ticket: A ticket to be sent to the other side of the operation. """ - def transmit(packet): + def transmit(ticket): while True: transmission_outcome = callable_util.call_logging_exceptions( - self._callback, _TRANSMISSION_EXCEPTION_LOG_MESSAGE, packet) + self._callback, _TRANSMISSION_EXCEPTION_LOG_MESSAGE, ticket) if transmission_outcome.exception is None: with self._lock: - complete, packet = self._next_packet() - if packet is None: + complete, ticket = self._next_ticket() + if ticket is None: if complete: self._termination_manager.transmission_complete() self._transmitting = False @@ -349,7 +348,7 @@ class _TransmittingTransmissionManager(TransmissionManager): return self._pool.submit(callable_util.with_exceptions_logged( - transmit, _constants.INTERNAL_ERROR_LOG_MESSAGE), packet) + transmit, _constants.INTERNAL_ERROR_LOG_MESSAGE), ticket) self._transmitting = True def inmit(self, emission, complete): @@ -359,17 +358,17 @@ class _TransmittingTransmissionManager(TransmissionManager): if self._transmitting: self._emissions.append(emission) else: - self._transmit(self._lead_packet(emission, complete)) + self._transmit(self._lead_ticket(emission, complete)) def abort(self, outcome): """See _interfaces.TransmissionManager.abort for specification.""" if self._emissions is not None and self._outcome is None: self._outcome = outcome if not self._transmitting: - packet = self._abortive_response_packet(outcome) + ticket = self._abortive_response_ticket(outcome) self._emissions = None - if packet is not None: - self._transmit(packet) + if ticket is not None: + self._transmit(ticket) def front_transmission_manager( @@ -379,14 +378,14 @@ def front_transmission_manager( Args: lock: The operation-servicing-wide lock object. - pool: A thread pool in which the work of transmitting packets will be + pool: A thread pool in which the work of transmitting tickets will be performed. - callback: A callable that accepts packets and sends them to the other side + callback: A callable that accepts tickets and sends them to the other side of the operation. operation_id: The operation's ID. name: The name of the operation. subscription_kind: An interfaces.ServicedSubscription.Kind value - describing the interest the front has in packets sent from the back. + describing the interest the front has in tickets sent from the back. trace_id: A uuid.UUID identifying a set of related operations to which this operation belongs. timeout: A length of time in seconds to allow for the entire operation. @@ -397,7 +396,7 @@ def front_transmission_manager( A TransmissionManager appropriate for front-side use. """ return _TransmittingTransmissionManager( - lock, pool, callback, operation_id, _FrontPacketizer( + lock, pool, callback, operation_id, _FrontTicketizer( name, subscription_kind, trace_id, timeout), termination_manager) @@ -409,15 +408,15 @@ def back_transmission_manager( Args: lock: The operation-servicing-wide lock object. - pool: A thread pool in which the work of transmitting packets will be + pool: A thread pool in which the work of transmitting tickets will be performed. - callback: A callable that accepts packets and sends them to the other side + callback: A callable that accepts tickets and sends them to the other side of the operation. operation_id: The operation's ID. termination_manager: The _interfaces.TerminationManager associated with this operation. subscription_kind: An interfaces.ServicedSubscription.Kind value - describing the interest the front has in packets sent from the back. + describing the interest the front has in tickets sent from the back. Returns: A TransmissionManager appropriate for back-side use. @@ -426,5 +425,5 @@ def back_transmission_manager( return _EmptyTransmissionManager() else: return _TransmittingTransmissionManager( - lock, pool, callback, operation_id, _BackPacketizer(), + lock, pool, callback, operation_id, _BackTicketizer(), termination_manager) diff --git a/src/python/src/grpc/framework/base/packets/implementations.py b/src/python/src/grpc/framework/base/implementations.py similarity index 75% rename from src/python/src/grpc/framework/base/packets/implementations.py rename to src/python/src/grpc/framework/base/implementations.py index 28688bcc0f98de016fac9bc56bba4bd4a9bec52e..5656f9f9812fa7b6c34dd9cc0325263afa08ae73 100644 --- a/src/python/src/grpc/framework/base/packets/implementations.py +++ b/src/python/src/grpc/framework/base/implementations.py @@ -27,51 +27,51 @@ # (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE # OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE. -"""Entry points into the packet-exchange-based implementation the base layer.""" +"""Entry points into the ticket-exchange-based base layer implementation.""" # interfaces is referenced from specification in this module. -from grpc.framework.base.packets import _ends -from grpc.framework.base.packets import interfaces # pylint: disable=unused-import +from grpc.framework.base import _ends +from grpc.framework.base import interfaces # pylint: disable=unused-import -def front(work_pool, transmission_pool, utility_pool): - """Factory function for creating interfaces.Fronts. +def front_link(work_pool, transmission_pool, utility_pool): + """Factory function for creating interfaces.FrontLinks. Args: - work_pool: A thread pool to be used for doing work within the created Front - object. - transmission_pool: A thread pool to be used within the created Front object - for transmitting values to some Back object. - utility_pool: A thread pool to be used within the created Front object for - utility tasks. + work_pool: A thread pool to be used for doing work within the created + FrontLink object. + transmission_pool: A thread pool to be used within the created FrontLink + object for transmitting values to a joined RearLink object. + utility_pool: A thread pool to be used within the created FrontLink object + for utility tasks. Returns: - An interfaces.Front. + An interfaces.FrontLink. """ - return _ends.Front(work_pool, transmission_pool, utility_pool) + return _ends.FrontLink(work_pool, transmission_pool, utility_pool) -def back( +def back_link( servicer, work_pool, transmission_pool, utility_pool, default_timeout, maximum_timeout): - """Factory function for creating interfaces.Backs. + """Factory function for creating interfaces.BackLinks. Args: servicer: An interfaces.Servicer for servicing operations. - work_pool: A thread pool to be used for doing work within the created Back - object. - transmission_pool: A thread pool to be used within the created Back object - for transmitting values to some Front object. - utility_pool: A thread pool to be used within the created Back object for - utility tasks. + work_pool: A thread pool to be used for doing work within the created + BackLink object. + transmission_pool: A thread pool to be used within the created BackLink + object for transmitting values to a joined ForeLink object. + utility_pool: A thread pool to be used within the created BackLink object + for utility tasks. default_timeout: A length of time in seconds to be used as the default time alloted for a single operation. maximum_timeout: A length of time in seconds to be used as the maximum time alloted for a single operation. Returns: - An interfaces.Back. + An interfaces.BackLink. """ - return _ends.Back( + return _ends.BackLink( servicer, work_pool, transmission_pool, utility_pool, default_timeout, maximum_timeout) diff --git a/src/python/src/grpc/framework/base/packets/implementations_test.py b/src/python/src/grpc/framework/base/implementations_test.py similarity index 94% rename from src/python/src/grpc/framework/base/packets/implementations_test.py rename to src/python/src/grpc/framework/base/implementations_test.py index e5855700c727ca9bfebba494809286602e2f110c..11e49caf756dc46363feeee86042cc69b5c66905 100644 --- a/src/python/src/grpc/framework/base/packets/implementations_test.py +++ b/src/python/src/grpc/framework/base/implementations_test.py @@ -27,13 +27,13 @@ # (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE # OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE. -"""Tests for _framework.base.packets.implementations.""" +"""Tests for grpc.framework.base.implementations.""" import unittest +from grpc.framework.base import implementations from grpc.framework.base import interfaces_test_case from grpc.framework.base import util -from grpc.framework.base.packets import implementations from grpc.framework.foundation import logging_pool POOL_MAX_WORKERS = 100 @@ -54,10 +54,10 @@ class ImplementationsTest( self.back_utility_pool = logging_pool.pool(POOL_MAX_WORKERS) self.test_pool = logging_pool.pool(POOL_MAX_WORKERS) self.test_servicer = interfaces_test_case.TestServicer(self.test_pool) - self.front = implementations.front( + self.front = implementations.front_link( self.front_work_pool, self.front_transmission_pool, self.front_utility_pool) - self.back = implementations.back( + self.back = implementations.back_link( self.test_servicer, self.back_work_pool, self.back_transmission_pool, self.back_utility_pool, DEFAULT_TIMEOUT, MAXIMUM_TIMEOUT) self.front.join_rear_link(self.back) diff --git a/src/python/src/grpc/framework/base/packets/in_memory.py b/src/python/src/grpc/framework/base/in_memory.py similarity index 95% rename from src/python/src/grpc/framework/base/packets/in_memory.py rename to src/python/src/grpc/framework/base/in_memory.py index 453fd3b38aaa618eeb093ab1d50e3995714068bf..c92d0bc663c557c984d24df575f104caa66bba3b 100644 --- a/src/python/src/grpc/framework/base/packets/in_memory.py +++ b/src/python/src/grpc/framework/base/in_memory.py @@ -27,12 +27,12 @@ # (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE # OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE. -"""Entry points into the packet-exchange-based implementation the base layer.""" +"""In-memory implementations of base layer interfaces.""" import threading -from grpc.framework.base.packets import _constants -from grpc.framework.base.packets import interfaces +from grpc.framework.base import _constants +from grpc.framework.base import interfaces from grpc.framework.foundation import callable_util diff --git a/src/python/src/grpc/framework/base/interfaces.py b/src/python/src/grpc/framework/base/interfaces.py index ed43b253fe5e1240a6cc8780135d5e92978943a0..e22c10d9750625b9e34590dc6275d5d4ceeee5cd 100644 --- a/src/python/src/grpc/framework/base/interfaces.py +++ b/src/python/src/grpc/framework/base/interfaces.py @@ -30,6 +30,7 @@ """Interfaces defined and used by the base layer of RPC Framework.""" import abc +import collections import enum # stream is referenced from specification in this module. @@ -230,3 +231,133 @@ class Front(End): class Back(End): """Serverish objects that perform the work of operations.""" __metaclass__ = abc.ABCMeta + + +class FrontToBackTicket( + collections.namedtuple( + 'FrontToBackTicket', + ['operation_id', 'sequence_number', 'kind', 'name', 'subscription', + 'trace_id', 'payload', 'timeout'])): + """A sum type for all values sent from a front to a back. + + Attributes: + operation_id: A unique-with-respect-to-equality hashable object identifying + a particular operation. + sequence_number: A zero-indexed integer sequence number identifying the + ticket's place among all the tickets sent from front to back for this + particular operation. Must be zero if kind is Kind.COMMENCEMENT or + Kind.ENTIRE. Must be positive for any other kind. + kind: A Kind value describing the overall kind of ticket. + name: The name of an operation. Must be present if kind is Kind.COMMENCEMENT + or Kind.ENTIRE. Must be None for any other kind. + subscription: An ServicedSubscription.Kind value describing the interest + the front has in tickets sent from the back. Must be present if + kind is Kind.COMMENCEMENT or Kind.ENTIRE. Must be None for any other kind. + trace_id: A uuid.UUID identifying a set of related operations to which this + operation belongs. May be None. + payload: A customer payload object. Must be present if kind is + Kind.CONTINUATION. Must be None if kind is Kind.CANCELLATION. May be None + for any other kind. + timeout: An optional length of time (measured from the beginning of the + operation) to allow for the entire operation. If None, a default value on + the back will be used. If present and excessively large, the back may + limit the operation to a smaller duration of its choice. May be present + for any ticket kind; setting a value on a later ticket allows fronts + to request time extensions (or even time reductions!) on in-progress + operations. + """ + + @enum.unique + class Kind(enum.Enum): + """Identifies the overall kind of a FrontToBackTicket.""" + + COMMENCEMENT = 'commencement' + CONTINUATION = 'continuation' + COMPLETION = 'completion' + ENTIRE = 'entire' + CANCELLATION = 'cancellation' + EXPIRATION = 'expiration' + SERVICER_FAILURE = 'servicer failure' + SERVICED_FAILURE = 'serviced failure' + RECEPTION_FAILURE = 'reception failure' + TRANSMISSION_FAILURE = 'transmission failure' + + +class BackToFrontTicket( + collections.namedtuple( + 'BackToFrontTicket', + ['operation_id', 'sequence_number', 'kind', 'payload'])): + """A sum type for all values sent from a back to a front. + + Attributes: + operation_id: A unique-with-respect-to-equality hashable object identifying + a particular operation. + sequence_number: A zero-indexed integer sequence number identifying the + ticket's place among all the tickets sent from back to front for this + particular operation. + kind: A Kind value describing the overall kind of ticket. + payload: A customer payload object. Must be present if kind is + Kind.CONTINUATION. May be None if kind is Kind.COMPLETION. Must be None + otherwise. + """ + + @enum.unique + class Kind(enum.Enum): + """Identifies the overall kind of a BackToFrontTicket.""" + + CONTINUATION = 'continuation' + COMPLETION = 'completion' + CANCELLATION = 'cancellation' + EXPIRATION = 'expiration' + SERVICER_FAILURE = 'servicer failure' + SERVICED_FAILURE = 'serviced failure' + RECEPTION_FAILURE = 'reception failure' + TRANSMISSION_FAILURE = 'transmission failure' + + +class ForeLink(object): + """Accepts back-to-front tickets and emits front-to-back tickets.""" + __metaclass__ = abc.ABCMeta + + @abc.abstractmethod + def accept_back_to_front_ticket(self, ticket): + """Accept a BackToFrontTicket. + + Args: + ticket: Any BackToFrontTicket. + """ + raise NotImplementedError() + + @abc.abstractmethod + def join_rear_link(self, rear_link): + """Mates this object with a peer with which it will exchange tickets.""" + raise NotImplementedError() + + +class RearLink(object): + """Accepts front-to-back tickets and emits back-to-front tickets.""" + __metaclass__ = abc.ABCMeta + + @abc.abstractmethod + def accept_front_to_back_ticket(self, ticket): + """Accepts a FrontToBackTicket. + + Args: + ticket: Any FrontToBackTicket. + """ + raise NotImplementedError() + + @abc.abstractmethod + def join_fore_link(self, fore_link): + """Mates this object with a peer with which it will exchange tickets.""" + raise NotImplementedError() + + +class FrontLink(Front, ForeLink): + """Clientish objects that operate by sending and receiving tickets.""" + __metaclass__ = abc.ABCMeta + + +class BackLink(Back, RearLink): + """Serverish objects that operate by sending and receiving tickets.""" + __metaclass__ = abc.ABCMeta diff --git a/src/python/src/grpc/framework/base/interfaces_test_case.py b/src/python/src/grpc/framework/base/interfaces_test_case.py index b86011c449efe4cdaef7f707f2bf528afcf2412c..dec10c2924d5cccb50cd630c25596ef909d4558d 100644 --- a/src/python/src/grpc/framework/base/interfaces_test_case.py +++ b/src/python/src/grpc/framework/base/interfaces_test_case.py @@ -164,7 +164,7 @@ class FrontAndBackTest(object): # pylint: disable=invalid-name def testSimplestCall(self): - """Tests the absolute simplest call - a one-packet fire-and-forget.""" + """Tests the absolute simplest call - a one-ticket fire-and-forget.""" self.front.operate( SYNCHRONOUS_ECHO, None, True, SMALL_TIMEOUT, util.none_serviced_subscription(), 'test trace ID') @@ -175,25 +175,25 @@ class FrontAndBackTest(object): # Assuming nothing really pathological (such as pauses on the order of # SMALL_TIMEOUT interfering with this test) there are a two different ways # the back could have experienced execution up to this point: - # (1) The packet is still either in the front waiting to be transmitted + # (1) The ticket is still either in the front waiting to be transmitted # or is somewhere on the link between the front and the back. The back has # no idea that this test is even happening. Calling wait_for_idle on it # would do no good because in this case the back is idle and the call would - # return with the packet bound for it still in the front or on the link. + # return with the ticket bound for it still in the front or on the link. back_operation_stats = self.back.operation_stats() first_back_possibility = EMPTY_OUTCOME_DICT - # (2) The packet arrived at the back and the back completed the operation. + # (2) The ticket arrived at the back and the back completed the operation. second_back_possibility = dict(EMPTY_OUTCOME_DICT) second_back_possibility[interfaces.Outcome.COMPLETED] = 1 self.assertIn( back_operation_stats, (first_back_possibility, second_back_possibility)) - # It's true that if the packet had arrived at the back and the back had + # It's true that if the ticket had arrived at the back and the back had # begun processing that wait_for_idle could hold test execution until the # back completed the operation, but that doesn't really collapse the # possibility space down to one solution. def testEntireEcho(self): - """Tests a very simple one-packet-each-way round-trip.""" + """Tests a very simple one-ticket-each-way round-trip.""" test_payload = 'test payload' test_consumer = stream_testing.TestConsumer() subscription = util.full_serviced_subscription( @@ -212,7 +212,7 @@ class FrontAndBackTest(object): self.assertListEqual([(test_payload, True)], test_consumer.calls) def testBidirectionalStreamingEcho(self): - """Tests sending multiple packets each way.""" + """Tests sending multiple tickets each way.""" test_payload_template = 'test_payload: %03d' test_payloads = [test_payload_template % i for i in range(STREAM_LENGTH)] test_consumer = stream_testing.TestConsumer() @@ -255,16 +255,16 @@ class FrontAndBackTest(object): # Assuming nothing really pathological (such as pauses on the order of # SMALL_TIMEOUT interfering with this test) there are a two different ways # the back could have experienced execution up to this point: - # (1) Both packets are still either in the front waiting to be transmitted + # (1) Both tickets are still either in the front waiting to be transmitted # or are somewhere on the link between the front and the back. The back has # no idea that this test is even happening. Calling wait_for_idle on it # would do no good because in this case the back is idle and the call would - # return with the packets bound for it still in the front or on the link. + # return with the tickets bound for it still in the front or on the link. back_operation_stats = self.back.operation_stats() first_back_possibility = EMPTY_OUTCOME_DICT - # (2) Both packets arrived within SMALL_TIMEOUT of one another at the back. - # The back started processing based on the first packet and then stopped - # upon receiving the cancellation packet. + # (2) Both tickets arrived within SMALL_TIMEOUT of one another at the back. + # The back started processing based on the first ticket and then stopped + # upon receiving the cancellation ticket. second_back_possibility = dict(EMPTY_OUTCOME_DICT) second_back_possibility[interfaces.Outcome.CANCELLED] = 1 self.assertIn( diff --git a/src/python/src/grpc/framework/base/packets/null.py b/src/python/src/grpc/framework/base/null.py similarity index 97% rename from src/python/src/grpc/framework/base/packets/null.py rename to src/python/src/grpc/framework/base/null.py index 5a2121243bf3e97ed9ce50fb95b103b0fa0f1b14..1e30d4557b1f4608537f89ff809a2e66d7b2d73e 100644 --- a/src/python/src/grpc/framework/base/packets/null.py +++ b/src/python/src/grpc/framework/base/null.py @@ -29,7 +29,7 @@ """Null links that ignore tickets passed to them.""" -from grpc.framework.base.packets import interfaces +from grpc.framework.base import interfaces class _NullForeLink(interfaces.ForeLink): diff --git a/src/python/src/grpc/framework/base/packets/interfaces.py b/src/python/src/grpc/framework/base/packets/interfaces.py deleted file mode 100644 index 7c48956ba59a2c8e19e7dd76d622eff679d54978..0000000000000000000000000000000000000000 --- a/src/python/src/grpc/framework/base/packets/interfaces.py +++ /dev/null @@ -1,84 +0,0 @@ -# Copyright 2015, Google Inc. -# All rights reserved. -# -# Redistribution and use in source and binary forms, with or without -# modification, are permitted provided that the following conditions are -# met: -# -# * Redistributions of source code must retain the above copyright -# notice, this list of conditions and the following disclaimer. -# * Redistributions in binary form must reproduce the above -# copyright notice, this list of conditions and the following disclaimer -# in the documentation and/or other materials provided with the -# distribution. -# * Neither the name of Google Inc. nor the names of its -# contributors may be used to endorse or promote products derived from -# this software without specific prior written permission. -# -# THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS -# "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT -# LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR -# A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT -# OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, -# SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT -# LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, -# DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY -# THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT -# (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE -# OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE. - -"""Interfaces defined and used by the base layer of RPC Framework.""" - -import abc - -# packets is referenced from specifications in this module. -from grpc.framework.base import interfaces -from grpc.framework.base.packets import packets # pylint: disable=unused-import - - -class ForeLink(object): - """Accepts back-to-front tickets and emits front-to-back tickets.""" - __metaclass__ = abc.ABCMeta - - @abc.abstractmethod - def accept_back_to_front_ticket(self, ticket): - """Accept a packets.BackToFrontPacket. - - Args: - ticket: Any packets.BackToFrontPacket. - """ - raise NotImplementedError() - - @abc.abstractmethod - def join_rear_link(self, rear_link): - """Mates this object with a peer with which it will exchange tickets.""" - raise NotImplementedError() - - -class RearLink(object): - """Accepts front-to-back tickets and emits back-to-front tickets.""" - __metaclass__ = abc.ABCMeta - - @abc.abstractmethod - def accept_front_to_back_ticket(self, ticket): - """Accepts a packets.FrontToBackPacket. - - Args: - ticket: Any packets.FrontToBackPacket. - """ - raise NotImplementedError() - - @abc.abstractmethod - def join_fore_link(self, fore_link): - """Mates this object with a peer with which it will exchange tickets.""" - raise NotImplementedError() - - -class Front(ForeLink, interfaces.Front): - """Clientish objects that operate by sending and receiving tickets.""" - __metaclass__ = abc.ABCMeta - - -class Back(RearLink, interfaces.Back): - """Serverish objects that operate by sending and receiving tickets.""" - __metaclass__ = abc.ABCMeta diff --git a/src/python/src/grpc/framework/base/packets/packets.py b/src/python/src/grpc/framework/base/packets/packets.py deleted file mode 100644 index 1b140481f0ec20e2a0bd7e01e70e0e4594487b3a..0000000000000000000000000000000000000000 --- a/src/python/src/grpc/framework/base/packets/packets.py +++ /dev/null @@ -1,118 +0,0 @@ -# Copyright 2015, Google Inc. -# All rights reserved. -# -# Redistribution and use in source and binary forms, with or without -# modification, are permitted provided that the following conditions are -# met: -# -# * Redistributions of source code must retain the above copyright -# notice, this list of conditions and the following disclaimer. -# * Redistributions in binary form must reproduce the above -# copyright notice, this list of conditions and the following disclaimer -# in the documentation and/or other materials provided with the -# distribution. -# * Neither the name of Google Inc. nor the names of its -# contributors may be used to endorse or promote products derived from -# this software without specific prior written permission. -# -# THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS -# "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT -# LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR -# A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT -# OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, -# SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT -# LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, -# DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY -# THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT -# (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE -# OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE. - -"""Packets used between fronts and backs.""" - -import collections -import enum - -# interfaces is referenced from specifications in this module. -from grpc.framework.base import interfaces # pylint: disable=unused-import - - -class FrontToBackPacket( - collections.namedtuple( - 'FrontToBackPacket', - ['operation_id', 'sequence_number', 'kind', 'name', 'subscription', - 'trace_id', 'payload', 'timeout'])): - """A sum type for all values sent from a front to a back. - - Attributes: - operation_id: A unique-with-respect-to-equality hashable object identifying - a particular operation. - sequence_number: A zero-indexed integer sequence number identifying the - packet's place among all the packets sent from front to back for this - particular operation. Must be zero if kind is Kind.COMMENCEMENT or - Kind.ENTIRE. Must be positive for any other kind. - kind: A Kind value describing the overall kind of ticket. - name: The name of an operation. Must be present if kind is Kind.COMMENCEMENT - or Kind.ENTIRE. Must be None for any other kind. - subscription: An interfaces.ServicedSubscription.Kind value describing the - interest the front has in packets sent from the back. Must be present if - kind is Kind.COMMENCEMENT or Kind.ENTIRE. Must be None for any other kind. - trace_id: A uuid.UUID identifying a set of related operations to which this - operation belongs. May be None. - payload: A customer payload object. Must be present if kind is - Kind.CONTINUATION. Must be None if kind is Kind.CANCELLATION. May be None - for any other kind. - timeout: An optional length of time (measured from the beginning of the - operation) to allow for the entire operation. If None, a default value on - the back will be used. If present and excessively large, the back may - limit the operation to a smaller duration of its choice. May be present - for any ticket kind; setting a value on a later ticket allows fronts - to request time extensions (or even time reductions!) on in-progress - operations. - """ - - @enum.unique - class Kind(enum.Enum): - """Identifies the overall kind of a FrontToBackPacket.""" - - COMMENCEMENT = 'commencement' - CONTINUATION = 'continuation' - COMPLETION = 'completion' - ENTIRE = 'entire' - CANCELLATION = 'cancellation' - EXPIRATION = 'expiration' - SERVICER_FAILURE = 'servicer failure' - SERVICED_FAILURE = 'serviced failure' - RECEPTION_FAILURE = 'reception failure' - TRANSMISSION_FAILURE = 'transmission failure' - - -class BackToFrontPacket( - collections.namedtuple( - 'BackToFrontPacket', - ['operation_id', 'sequence_number', 'kind', 'payload'])): - """A sum type for all values sent from a back to a front. - - Attributes: - operation_id: A unique-with-respect-to-equality hashable object identifying - a particular operation. - sequence_number: A zero-indexed integer sequence number identifying the - packet's place among all the packets sent from back to front for this - particular operation. - kind: A Kind value describing the overall kind of ticket. - payload: A customer payload object. Must be present if kind is - Kind.CONTINUATION. May be None if kind is Kind.COMPLETION. Must be None - otherwise. - """ - - @enum.unique - class Kind(enum.Enum): - """Identifies the overall kind of a BackToFrontPacket.""" - - CONTINUATION = 'continuation' - COMPLETION = 'completion' - CANCELLATION = 'cancellation' - EXPIRATION = 'expiration' - SERVICER_FAILURE = 'servicer failure' - SERVICED_FAILURE = 'serviced failure' - RECEPTION_FAILURE = 'reception failure' - TRANSMISSION_FAILURE = 'transmission failure' diff --git a/src/python/src/grpc/framework/face/demonstration.py b/src/python/src/grpc/framework/face/demonstration.py index d922f6e5ef05ccf203f432698938b3cca40fe0ff..eabeac4569763a115912ddbdf9a6ec294a1aa11d 100644 --- a/src/python/src/grpc/framework/face/demonstration.py +++ b/src/python/src/grpc/framework/face/demonstration.py @@ -30,7 +30,7 @@ """Demonstration-suitable implementation of the face layer of RPC Framework.""" from grpc.framework.base import util as _base_util -from grpc.framework.base.packets import implementations as _tickets_implementations +from grpc.framework.base import implementations as _base_implementations from grpc.framework.face import implementations from grpc.framework.foundation import logging_pool @@ -105,9 +105,9 @@ def server_and_stub( event_stream_in_stream_out_methods=event_stream_in_stream_out_methods, multi_method=multi_method) - front = _tickets_implementations.front( + front = _base_implementations.front_link( front_work_pool, front_transmission_pool, front_utility_pool) - back = _tickets_implementations.back( + back = _base_implementations.back_link( servicer, back_work_pool, back_transmission_pool, back_utility_pool, default_timeout, _MAXIMUM_TIMEOUT) front.join_rear_link(back) diff --git a/src/python/src/grpc/framework/face/testing/base_util.py b/src/python/src/grpc/framework/face/testing/base_util.py index 7872a6b9e94de2b58e74a415ab2fd6aae6aded8e..151d0ef793ca6ee9a6e5c5cfad3e5e199027bb21 100644 --- a/src/python/src/grpc/framework/face/testing/base_util.py +++ b/src/python/src/grpc/framework/face/testing/base_util.py @@ -33,9 +33,9 @@ import abc # interfaces is referenced from specification in this module. from grpc.framework.base import util as _base_util -from grpc.framework.base.packets import implementations -from grpc.framework.base.packets import in_memory -from grpc.framework.base.packets import interfaces # pylint: disable=unused-import +from grpc.framework.base import implementations +from grpc.framework.base import in_memory +from grpc.framework.base import interfaces # pylint: disable=unused-import from grpc.framework.foundation import logging_pool _POOL_SIZE_LIMIT = 20 @@ -89,9 +89,9 @@ def linked_pair(servicer, default_timeout): back_work_pool, back_transmission_pool, back_utility_pool) link = in_memory.Link(link_pool) - front = implementations.front( + front = implementations.front_link( front_work_pool, front_transmission_pool, front_utility_pool) - back = implementations.back( + back = implementations.back_link( servicer, back_work_pool, back_transmission_pool, back_utility_pool, default_timeout, _MAXIMUM_TIMEOUT) front.join_rear_link(link) diff --git a/src/python/src/grpc/framework/foundation/_logging_pool_test.py b/src/python/src/grpc/framework/foundation/_logging_pool_test.py index 11463a8bece3306688550b0ba427cf9f24bd439b..c92cf8c0ab6f8ffe054f7ad0d0f0436c6efc98b8 100644 --- a/src/python/src/grpc/framework/foundation/_logging_pool_test.py +++ b/src/python/src/grpc/framework/foundation/_logging_pool_test.py @@ -27,7 +27,7 @@ # (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE # OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE. -"""Tests for _framework.foundation.logging_pool.""" +"""Tests for grpc.framework.foundation.logging_pool.""" import unittest diff --git a/src/python/src/setup.py b/src/python/src/setup.py index 7d93aa7ded03519d94c2132861044adc833bab1c..bd70634b8fe72dbc952dc4f74c1db19fb21c97f0 100644 --- a/src/python/src/setup.py +++ b/src/python/src/setup.py @@ -64,8 +64,8 @@ _PACKAGES = ( 'grpc._junkdrawer', 'grpc.early_adopter', 'grpc.framework', + 'grpc.framework.alpha', 'grpc.framework.base', - 'grpc.framework.base.packets', 'grpc.framework.common', 'grpc.framework.face', 'grpc.framework.face.testing', diff --git a/src/ruby/bin/interop/interop_client.rb b/src/ruby/bin/interop/interop_client.rb index b0b24b949f25326f8da8b9f4f80731a09271d767..b2a8711c7907a89a127173170d5835ebace4c54e 100755 --- a/src/ruby/bin/interop/interop_client.rb +++ b/src/ruby/bin/interop/interop_client.rb @@ -57,7 +57,7 @@ require 'test/cpp/interop/empty' require 'signet/ssl_config' -AUTH_ENV = Google::Auth::ServiceAccountCredentials::ENV_VAR +AUTH_ENV = Google::Auth::CredentialsLoader::ENV_VAR # loads the certificates used to access the test server securely. def load_test_certs diff --git a/src/ruby/lib/grpc/generic/active_call.rb b/src/ruby/lib/grpc/generic/active_call.rb index 7b69f1f6d0d461b0ae45dd2437a5b7fc7d42db7c..6256330e88e420a9d1c270b8760e0bdf7a9ffb45 100644 --- a/src/ruby/lib/grpc/generic/active_call.rb +++ b/src/ruby/lib/grpc/generic/active_call.rb @@ -505,12 +505,12 @@ module GRPC # SingleReqView limits access to an ActiveCall's methods for use in server # handlers that receive just one request. - SingleReqView = view_class(:cancelled, :deadline) + SingleReqView = view_class(:cancelled, :deadline, :metadata) # MultiReqView limits access to an ActiveCall's methods for use in # server client_streamer handlers. MultiReqView = view_class(:cancelled, :deadline, :each_queued_msg, - :each_remote_read) + :each_remote_read, :metadata) # Operation limits access to an ActiveCall's methods for use as # a Operation on the client. diff --git a/src/ruby/lib/grpc/generic/rpc_desc.rb b/src/ruby/lib/grpc/generic/rpc_desc.rb index 5e3d3c9f9cfcbd47fab7b4151d147314fbc596f7..2cb3d2eebf43744be89af19c5caa972c1985fbe5 100644 --- a/src/ruby/lib/grpc/generic/rpc_desc.rb +++ b/src/ruby/lib/grpc/generic/rpc_desc.rb @@ -81,6 +81,7 @@ module GRPC active_call.run_server_bidi(mth) end send_status(active_call, OK, 'OK') + active_call.finished rescue BadStatus => e # this is raised by handlers that want GRPC to send an application # error code and detail message. diff --git a/src/ruby/spec/generic/active_call_spec.rb b/src/ruby/spec/generic/active_call_spec.rb index 12cb5c15580ab775ade7580895a56b93aa955012..8914225b558c5b91125af648bb9d56254218af16 100644 --- a/src/ruby/spec/generic/active_call_spec.rb +++ b/src/ruby/spec/generic/active_call_spec.rb @@ -68,7 +68,7 @@ describe GRPC::ActiveCall do describe '#multi_req_view' do xit 'exposes a fixed subset of the ActiveCall methods' do - want = %w(cancelled, deadline, each_remote_read, shutdown) + want = %w(cancelled, deadline, each_remote_read, metadata, shutdown) v = @client_call.multi_req_view want.each do |w| expect(v.methods.include?(w)) @@ -78,7 +78,7 @@ describe GRPC::ActiveCall do describe '#single_req_view' do xit 'exposes a fixed subset of the ActiveCall methods' do - want = %w(cancelled, deadline, shutdown) + want = %w(cancelled, deadline, metadata, shutdown) v = @client_call.single_req_view want.each do |w| expect(v.methods.include?(w)) diff --git a/src/ruby/spec/generic/client_stub_spec.rb b/src/ruby/spec/generic/client_stub_spec.rb index adf354f4eef303844148816c4d4b1db2ce17201c..73f2d37e305bdef89aa4ea71fa0b21822369e5d9 100644 --- a/src/ruby/spec/generic/client_stub_spec.rb +++ b/src/ruby/spec/generic/client_stub_spec.rb @@ -434,7 +434,7 @@ describe 'ClientStub' do end expect(c.remote_read).to eq(expected_input) replys.each { |r| c.remote_send(r) } - c.send_status(status, status == @pass ? 'OK' : 'NOK') + c.send_status(status, status == @pass ? 'OK' : 'NOK', true) end end @@ -444,7 +444,7 @@ describe 'ClientStub' do c = expect_server_to_be_invoked(mtx, cnd) expected_inputs.each { |i| expect(c.remote_read).to eq(i) } replys.each { |r| c.remote_send(r) } - c.send_status(status, status == @pass ? 'OK' : 'NOK') + c.send_status(status, status == @pass ? 'OK' : 'NOK', true) end end @@ -460,7 +460,7 @@ describe 'ClientStub' do expect(c.remote_read).to eq(i) end end - c.send_status(status, status == @pass ? 'OK' : 'NOK') + c.send_status(status, status == @pass ? 'OK' : 'NOK', true) end end @@ -473,7 +473,7 @@ describe 'ClientStub' do expect(c.metadata[k.to_s]).to eq(v) end c.remote_send(resp) - c.send_status(status, status == @pass ? 'OK' : 'NOK') + c.send_status(status, status == @pass ? 'OK' : 'NOK', true) end end @@ -486,7 +486,7 @@ describe 'ClientStub' do expect(c.metadata[k.to_s]).to eq(v) end c.remote_send(resp) - c.send_status(status, status == @pass ? 'OK' : 'NOK') + c.send_status(status, status == @pass ? 'OK' : 'NOK', true) end end diff --git a/src/ruby/spec/generic/rpc_desc_spec.rb b/src/ruby/spec/generic/rpc_desc_spec.rb index 8bff2a9a644719c9d22cb2e064b4c4995692dee9..39d1e83748bf4d2812278851ed2b1fbd3fef7a56 100644 --- a/src/ruby/spec/generic/rpc_desc_spec.rb +++ b/src/ruby/spec/generic/rpc_desc_spec.rb @@ -94,6 +94,7 @@ describe GRPC::RpcDesc do expect(@call).to receive(:remote_read).once.and_return(req) expect(@call).to receive(:remote_send).once.with(@ok_response) expect(@call).to receive(:send_status).once.with(OK, 'OK') + expect(@call).to receive(:finished).once @request_response.run_server_method(@call, method(:fake_reqresp)) end end @@ -134,6 +135,7 @@ describe GRPC::RpcDesc do it 'sends a response and closes the stream if there no errors' do expect(@call).to receive(:remote_send).once.with(@ok_response) expect(@call).to receive(:send_status).once.with(OK, 'OK') + expect(@call).to receive(:finished).once @client_streamer.run_server_method(@call, method(:fake_clstream)) end end @@ -178,6 +180,7 @@ describe GRPC::RpcDesc do expect(@call).to receive(:remote_read).once.and_return(req) expect(@call).to receive(:remote_send).twice.with(@ok_response) expect(@call).to receive(:send_status).once.with(OK, 'OK') + expect(@call).to receive(:finished).once @server_streamer.run_server_method(@call, method(:fake_svstream)) end end @@ -207,6 +210,7 @@ describe GRPC::RpcDesc do it 'closes the stream if there no errors' do expect(@call).to receive(:run_server_bidi) expect(@call).to receive(:send_status).once.with(OK, 'OK') + expect(@call).to receive(:finished).once @bidi_streamer.run_server_method(@call, method(:fake_bidistream)) end end diff --git a/src/ruby/spec/generic/rpc_server_spec.rb b/src/ruby/spec/generic/rpc_server_spec.rb index d5421d400c99c9f14ae2c6c0609760ee623abe45..f3b89b5895eb9ee8771dc7fdbaaf71dd6a422783 100644 --- a/src/ruby/spec/generic/rpc_server_spec.rb +++ b/src/ruby/spec/generic/rpc_server_spec.rb @@ -62,12 +62,15 @@ end class EchoService include GRPC::GenericService rpc :an_rpc, EchoMsg, EchoMsg + attr_reader :received_md def initialize(_default_var = 'ignored') + @received_md = [] end - def an_rpc(req, _call) + def an_rpc(req, call) logger.info('echo service received a request') + @received_md << call.metadata unless call.metadata.nil? req end end @@ -337,6 +340,38 @@ describe GRPC::RpcServer do t.join end + it 'should receive metadata sent as rpc keyword args', server: true do + service = EchoService.new + @srv.handle(service) + t = Thread.new { @srv.run } + @srv.wait_till_running + req = EchoMsg.new + stub = EchoStub.new(@host, **@client_opts) + expect(stub.an_rpc(req, k1: 'v1', k2: 'v2')).to be_a(EchoMsg) + wanted_md = [{ 'k1' => 'v1', 'k2' => 'v2' }] + expect(service.received_md).to eq(wanted_md) + @srv.stop + t.join + end + + it 'should receive updated metadata', server: true do + service = EchoService.new + @srv.handle(service) + t = Thread.new { @srv.run } + @srv.wait_till_running + req = EchoMsg.new + @client_opts[:update_metadata] = proc do |md| + md[:k1] = 'updated-v1' + md + end + stub = EchoStub.new(@host, **@client_opts) + expect(stub.an_rpc(req, k1: 'v1', k2: 'v2')).to be_a(EchoMsg) + wanted_md = [{ 'k1' => 'updated-v1', 'k2' => 'v2' }] + expect(service.received_md).to eq(wanted_md) + @srv.stop + t.join + end + it 'should handle multiple parallel requests', server: true do @srv.handle(EchoService) Thread.new { @srv.run } diff --git a/templates/Makefile.template b/templates/Makefile.template index 148fa7cd63330020492e9b89b82b025a87f5a3e9..6845b911aa10039f75e58e5146ccd260e4258e46 100644 --- a/templates/Makefile.template +++ b/templates/Makefile.template @@ -653,6 +653,11 @@ test_cxx: buildtests_cxx % endfor +test_python: static_c + $(E) "[RUN] Testing python code" + $(Q) tools/run_tests/run_tests.py -lpython -c$(CONFIG) + + tools: privatelibs\ % for tgt in targets: % if tgt.build == 'tool': diff --git a/test/compiler/python_plugin_test.py b/test/compiler/python_plugin_test.py index 9cf3c624c05d3be3731bc1c60f164971b0da7a8c..3d2f117b0df417e68fe5026be519ae1efb882de4 100644 --- a/test/compiler/python_plugin_test.py +++ b/test/compiler/python_plugin_test.py @@ -39,7 +39,7 @@ import tempfile import time import unittest -from grpc.early_adopter import exceptions +from grpc.framework.alpha import exceptions from grpc.framework.foundation import future # Identifiers of entities we expect to find in the generated module. diff --git a/test/cpp/interop/client.cc b/test/cpp/interop/client.cc index ef410671ce0093ff688d0e31c42ed3f16c7be930..de6c6b7b77e6289ce5ed813ac4d72fd2f13a6dac 100644 --- a/test/cpp/interop/client.cc +++ b/test/cpp/interop/client.cc @@ -161,6 +161,15 @@ std::shared_ptr<ChannelInterface> CreateChannelForTestCase( } } +void AssertOkOrPrintErrorStatus(const grpc::Status& s) { + if (s.IsOk()) { + return; + } + gpr_log(GPR_INFO, "Error status code: %d, message: %s", s.code(), + s.details().c_str()); + GPR_ASSERT(0); +} + void DoEmpty() { gpr_log(GPR_INFO, "Sending an empty rpc..."); std::shared_ptr<ChannelInterface> channel = @@ -172,8 +181,8 @@ void DoEmpty() { ClientContext context; grpc::Status s = stub->EmptyCall(&context, request, &response); + AssertOkOrPrintErrorStatus(s); - GPR_ASSERT(s.IsOk()); gpr_log(GPR_INFO, "Empty rpc done."); } @@ -190,7 +199,7 @@ void PerformLargeUnary(std::shared_ptr<ChannelInterface> channel, grpc::Status s = stub->UnaryCall(&context, *request, response); - GPR_ASSERT(s.IsOk()); + AssertOkOrPrintErrorStatus(s); GPR_ASSERT(response->payload().type() == grpc::testing::PayloadType::COMPRESSABLE); GPR_ASSERT(response->payload().body() == @@ -284,7 +293,7 @@ void DoRequestStreaming() { grpc::Status s = stream->Finish(); GPR_ASSERT(response.aggregated_payload_size() == aggregated_payload_size); - GPR_ASSERT(s.IsOk()); + AssertOkOrPrintErrorStatus(s); gpr_log(GPR_INFO, "Request streaming done."); } @@ -313,7 +322,7 @@ void DoResponseStreaming() { GPR_ASSERT(response_stream_sizes.size() == i); grpc::Status s = stream->Finish(); - GPR_ASSERT(s.IsOk()); + AssertOkOrPrintErrorStatus(s); gpr_log(GPR_INFO, "Response streaming done."); } @@ -345,7 +354,7 @@ void DoResponseStreamingWithSlowConsumer() { GPR_ASSERT(kNumResponseMessages == i); grpc::Status s = stream->Finish(); - GPR_ASSERT(s.IsOk()); + AssertOkOrPrintErrorStatus(s); gpr_log(GPR_INFO, "Response streaming done."); } @@ -378,7 +387,7 @@ void DoHalfDuplex() { } GPR_ASSERT(response_stream_sizes.size() == i); grpc::Status s = stream->Finish(); - GPR_ASSERT(s.IsOk()); + AssertOkOrPrintErrorStatus(s); gpr_log(GPR_INFO, "Half-duplex streaming rpc done."); } @@ -411,7 +420,7 @@ void DoPingPong() { stream->WritesDone(); GPR_ASSERT(!stream->Read(&response)); grpc::Status s = stream->Finish(); - GPR_ASSERT(s.IsOk()); + AssertOkOrPrintErrorStatus(s); gpr_log(GPR_INFO, "Ping pong streaming done."); } diff --git a/tools/dockerfile/grpc_dist_proto/Dockerfile b/tools/dockerfile/grpc_dist_proto/Dockerfile new file mode 100644 index 0000000000000000000000000000000000000000..b4ed3b6035d75be36c77092b65fc705fe5fb8ae2 --- /dev/null +++ b/tools/dockerfile/grpc_dist_proto/Dockerfile @@ -0,0 +1,76 @@ +# Copyright 2015, Google Inc. +# All rights reserved. +# +# Redistribution and use in source and binary forms, with or without +# modification, are permitted provided that the following conditions are +# met: +# +# * Redistributions of source code must retain the above copyright +# notice, this list of conditions and the following disclaimer. +# * Redistributions in binary form must reproduce the above +# copyright notice, this list of conditions and the following disclaimer +# in the documentation and/or other materials provided with the +# distribution. +# * Neither the name of Google Inc. nor the names of its +# contributors may be used to endorse or promote products derived from +# this software without specific prior written permission. +# +# THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS +# "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT +# LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR +# A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT +# OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, +# SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT +# LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, +# DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY +# THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT +# (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE +# OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE. + +# Dockerfile to build protoc and plugins for inclusion in a release. +FROM grpc/base + +# Add the file containing the gRPC version +ADD version.txt version.txt + +# Install tools needed for building protoc. +RUN apt-get update && apt-get -y install libgflags-dev libgtest-dev + +# Get the protobuf source from GitHub. +RUN mkdir -p /var/local/git +RUN git clone https://github.com/google/protobuf.git /var/local/git/protobuf + +# Build the protobuf library statically and install to /tmp/protoc_static. +WORKDIR /var/local/git/protobuf +RUN ./autogen.sh && \ + ./configure --disable-shared --prefix=/tmp/protoc_static \ + LDFLAGS="-lgcc_eh -static-libgcc -static-libstdc++" && \ + make -j12 && make check && make install + +# Build the protobuf library dynamically and install to /usr/local. +WORKDIR /var/local/git/protobuf +RUN ./autogen.sh && \ + ./configure --prefix=/usr/local && \ + make -j12 && make check && make install + +# Build the grpc plugins. +RUN git clone https://github.com/google/grpc.git /var/local/git/grpc +WORKDIR /var/local/git/grpc +RUN LDFLAGS=-static make plugins + +# Create an archive containing all the generated binaries. +RUN mkdir /tmp/proto-bins_$(cat /version.txt)_linux-$(uname -m) +RUN cp -v bins/opt/* /tmp/proto-bins_$(cat /version.txt)_linux-$(uname -m) +RUN cp -v /tmp/protoc_static/bin/protoc /tmp/proto-bins_$(cat /version.txt)_linux-$(uname -m) +RUN cd /tmp && \ + tar -czf proto-bins_$(cat /version.txt)_linux-$(uname -m).tar.gz proto-bins_$(cat /version.txt)_linux-$(uname -m) + +# List the tar contents: provides a way to visually confirm that the contents +# are correct. +RUN echo 'proto-bins_$(cat /version.txt)_linux-tar-$(uname -m) contents:' && \ + tar -ztf /tmp/proto-bins_$(cat /version.txt)_linux-$(uname -m).tar.gz + + + + + diff --git a/tools/dockerfile/grpc_dist_proto/version.txt b/tools/dockerfile/grpc_dist_proto/version.txt new file mode 100644 index 0000000000000000000000000000000000000000..8f0916f768f0487bcf8d33827ce2c8dcecb645c1 --- /dev/null +++ b/tools/dockerfile/grpc_dist_proto/version.txt @@ -0,0 +1 @@ +0.5.0 diff --git a/tools/dockerfile/grpc_python/Dockerfile b/tools/dockerfile/grpc_python/Dockerfile index fd07e9cc6a99ab147ca2a39b91c0a5f224de1c67..62ef785a3184384a334de3bf50e73580ba1d7901 100644 --- a/tools/dockerfile/grpc_python/Dockerfile +++ b/tools/dockerfile/grpc_python/Dockerfile @@ -54,7 +54,7 @@ RUN cd /var/local/git/grpc \ && python2.7 -B -m grpc._adapter._lonely_rear_link_test \ && python2.7 -B -m grpc._adapter._low_test \ && python2.7 -B -m grpc.early_adopter.implementations_test \ - && python2.7 -B -m grpc.framework.base.packets.implementations_test \ + && python2.7 -B -m grpc.framework.base.implementations_test \ && python2.7 -B -m grpc.framework.face.blocking_invocation_inline_service_test \ && python2.7 -B -m grpc.framework.face.event_invocation_synchronous_event_service_test \ && python2.7 -B -m grpc.framework.face.future_invocation_asynchronous_event_service_test \ diff --git a/tools/gce_setup/cloud_prod_runner.sh b/tools/gce_setup/cloud_prod_runner.sh index 3a9ae51b769572239b4475e57afd3bd6d483ea0f..e236c921ec8a25062d7de60c9f04e00ea24dc5f5 100755 --- a/tools/gce_setup/cloud_prod_runner.sh +++ b/tools/gce_setup/cloud_prod_runner.sh @@ -36,7 +36,7 @@ echo $result_file_name main() { source grpc_docker.sh test_cases=(large_unary empty_unary ping_pong client_streaming server_streaming cancel_after_begin cancel_after_first_response) - auth_test_cases=(service_account_creds compute_engine_creds) + auth_test_cases=(service_account_creds compute_engine_creds jwt_token_creds) clients=(cxx java go ruby node csharp_mono) for test_case in "${test_cases[@]}" do diff --git a/tools/gce_setup/grpc_docker.sh b/tools/gce_setup/grpc_docker.sh index 3deef05ef35d888c6ff8c81f70d6968b6c72b693..497112ce3995e5f70c7f270782c283f8c9f22059 100755 --- a/tools/gce_setup/grpc_docker.sh +++ b/tools/gce_setup/grpc_docker.sh @@ -560,7 +560,7 @@ grpc_sync_scripts() { _grpc_ensure_gcloud_ssh || return 1; # declare vars local so that they don't pollute the shell environment - # where they this func is used. + # where this func is used. local grpc_zone grpc_project dry_run # set by _grpc_set_project_and_zone local grpc_hosts grpc_gce_script_root @@ -600,7 +600,7 @@ grpc_sync_images() { _grpc_ensure_gcloud_ssh || return 1; # declare vars local so that they don't pollute the shell environment - # where they this func is used. + # where this func is used. local grpc_zone grpc_project dry_run # set by _grpc_set_project_and_zone local grpc_hosts @@ -645,7 +645,7 @@ _grpc_show_servers_args() { # Shows the grpc servers on the GCE instance <server_name> grpc_show_servers() { # declare vars local so that they don't pollute the shell environment - # where they this func is used. + # where this func is used. local grpc_zone grpc_project dry_run # set by _grpc_set_project_and_zone # set by _grpc_show_servers local host @@ -663,6 +663,58 @@ grpc_show_servers() { gcloud compute $project_opt ssh $zone_opt $host --command "$cmd" } +_grpc_build_proto_bins_args() { + [[ -n $1 ]] && { # host + host=$1 + shift + } || { + host='grpc-docker-builder' + } +} + +# grpc_build_proto_bins +# +# - rebuilds the dist_proto docker image +# * doing this builds the protoc and the ruby, python and cpp bins statically +# +# - runs a docker command that copies the built protos to the GCE host +# - copies the built protos to the local machine +grpc_build_proto_bins() { + _grpc_ensure_gcloud_ssh || return 1; + + # declare vars local so that they don't pollute the shell environment + # where this func is used. + local grpc_zone grpc_project dry_run # set by _grpc_set_project_and_zone + # set by _grpc_build_proto_bins_args + local host + + # set the project zone and check that all necessary args are provided + _grpc_set_project_and_zone -f _grpc_build_proto_bins_args "$@" || return 1 + gce_has_instance $grpc_project $host || return 1; + local project_opt="--project $grpc_project" + local zone_opt="--zone $grpc_zone" + + # rebuild the dist_proto image + local label='dist_proto' + grpc_update_image -- -h $host $label || return 1 + + # run a command to copy the generated archive to the docker host + local docker_prefix='sudo docker run -v /tmp:/tmp/proto_bins_out' + local tar_name='proto-bins*.tar.gz' + local cp_cmd="/bin/bash -c 'cp -v /tmp/$tar_name /tmp/proto_bins_out'" + local cmd="$docker_prefix grpc/$label $cp_cmd" + local ssh_cmd="bash -l -c \"$cmd\"" + echo "will run:" + echo " $ssh_cmd" + echo "on $host" + gcloud compute $project_opt ssh $zone_opt $host --command "$cmd" || return 1 + + # copy the tar.gz locally + local rmt_tar="$host:/tmp/$tar_name" + local local_copy="$(pwd)" + gcloud compute copy-files $rmt_tar $local_copy $project_opt $zone_opt || return 1 +} + _grpc_launch_servers_args() { [[ -n $1 ]] && { # host host=$1 @@ -690,7 +742,7 @@ _grpc_launch_servers_args() { # If no servers are specified, it launches all known servers grpc_launch_servers() { # declare vars local so that they don't pollute the shell environment - # where they this func is used. + # where this func is used. local grpc_zone grpc_project dry_run # set by _grpc_set_project_and_zone # set by _grpc_launch_servers_args local host servers @@ -811,7 +863,7 @@ test_runner() { grpc_interop_test() { _grpc_ensure_gcloud_ssh || return 1; # declare vars local so that they don't pollute the shell environment - # where they this func is used. + # where this func is used. local grpc_zone grpc_project dry_run # set by _grpc_set_project_and_zone # grpc_interop_test_args @@ -853,7 +905,7 @@ grpc_interop_test() { grpc_cloud_prod_test() { _grpc_ensure_gcloud_ssh || return 1; # declare vars local so that they don't pollute the shell environment - # where they this func is used. + # where this func is used. local grpc_zone grpc_project dry_run # set by _grpc_set_project_and_zone # grpc_cloud_prod_test_args @@ -892,7 +944,7 @@ grpc_cloud_prod_test() { grpc_cloud_prod_auth_test() { _grpc_ensure_gcloud_ssh || return 1; # declare vars local so that they don't pollute the shell environment - # where they this func is used. + # where this func is used. local grpc_zone grpc_project dry_run # set by _grpc_set_project_and_zone # grpc_cloud_prod_test_args @@ -1192,6 +1244,20 @@ grpc_cloud_prod_auth_compute_engine_creds_gen_cxx_cmd() { echo $the_cmd } +# constructs the full dockerized cpp jwt_token auth interop test cmd. +# +# call-seq: +# flags= .... # generic flags to include the command +# cmd=$($grpc_gen_test_cmd $flags) +grpc_cloud_prod_auth_jwt_token_creds_gen_cxx_cmd() { + local cmd_prefix="sudo docker run grpc/cxx"; + local test_script="/var/local/git/grpc/bins/opt/interop_client --enable_ssl --use_prod_roots"; + local gfe_flags=$(_grpc_prod_gfe_flags) + local added_gfe_flags=$(_grpc_jwt_token_test_flags) + local the_cmd="$cmd_prefix $test_script $gfe_flags $added_gfe_flags $@"; + echo $the_cmd +} + # constructs the full dockerized csharp-mono interop test cmd. # # call-seq: @@ -1230,6 +1296,11 @@ _grpc_svc_acc_test_flags() { echo " --service_account_key_file=/service_account/stubbyCloudTestingTest-7dd63462c60c.json --oauth_scope=https://www.googleapis.com/auth/xapi.zoo" } +# outputs the flags passed to the service account auth tests +_grpc_jwt_token_test_flags() { + echo " --service_account_key_file=/service_account/stubbyCloudTestingTest-7dd63462c60c.json" +} + # default credentials test flag _grpc_default_creds_test_flags() { echo " --oauth_scope=https://www.googleapis.com/auth/xapi.zoo" diff --git a/tools/gce_setup/interop_test_runner.sh b/tools/gce_setup/interop_test_runner.sh index 430ad09b8c6308ffd952b083171a8c5b2b64d9e0..7f0b5bab1a91c6d8d7cfd10fba5ff69737a2a2cb 100755 --- a/tools/gce_setup/interop_test_runner.sh +++ b/tools/gce_setup/interop_test_runner.sh @@ -37,7 +37,7 @@ main() { source grpc_docker.sh test_cases=(large_unary empty_unary ping_pong client_streaming server_streaming cancel_after_begin cancel_after_first_response) clients=(cxx java go ruby node python csharp_mono) - servers=(cxx java go ruby node python) + servers=(cxx java go ruby node python csharp_mono) for test_case in "${test_cases[@]}" do for client in "${clients[@]}" diff --git a/tools/run_tests/jobset.py b/tools/run_tests/jobset.py index 26caf031c3a5f8b5d6a71371e58e270734824f88..81cdd0e6e407e8888aceefccb935c4fd3360c563 100755 --- a/tools/run_tests/jobset.py +++ b/tools/run_tests/jobset.py @@ -192,7 +192,7 @@ class Job(object): self._tempfile.seek(0) stdout = self._tempfile.read() message('FAILED', '%s [ret=%d]' % ( - self._spec.shortname, self._process.returncode), stdout) + self._spec.shortname, self._process.returncode), stdout, do_newline=True) else: self._state = _SUCCESS message('PASSED', '%s [time=%.1fsec]' % (self._spec.shortname, elapsed), @@ -200,7 +200,7 @@ class Job(object): if self._bin_hash: update_cache.finished(self._spec.identity(), self._bin_hash) elif self._state == _RUNNING and time.time() - self._start > 300: - message('TIMEOUT', self._spec.shortname, do_newline=self._travis) + message('TIMEOUT', self._spec.shortname, do_newline=True) self.kill() return self._state diff --git a/tools/run_tests/python_tests.json b/tools/run_tests/python_tests.json index ef483d9799c38a96ad8707d9bc37245cd46ca014..dff053784dbbadfb5ef8c9a71897bffd5d3d088a 100755 --- a/tools/run_tests/python_tests.json +++ b/tools/run_tests/python_tests.json @@ -27,7 +27,7 @@ "module": "grpc.early_adopter.implementations_test" }, { - "module": "grpc.framework.base.packets.implementations_test" + "module": "grpc.framework.base.implementations_test" }, { "module": "grpc.framework.face.blocking_invocation_inline_service_test"