Skip to content
Snippets Groups Projects
Select Git revision
  • f5768a6765cae20477f36b346ce18bfb44fe2e25
  • master default protected
  • arm-aarch-platform
  • arm-platform
  • vjpai-patch-3
  • vjpai-patch-1
  • v1.27.x
  • jtattermusch-patch-2
  • jtattermusch-patch-1
  • update-java-worker-example-in-performance-docs
  • revert-21805-revert-21797-reintroduce_21527
  • revert-21804-tls-credentials-1
  • zhen_cleanup_namecheck
  • revert-21806-revert-21767-revert-21725-revert-21680-cq_ordering
  • vjpai-patch-2
  • revert-21766-tls-credentials-1
  • revert-21640-change_local_tcp_security_level
  • revert-21680-cq_ordering
  • revert-21527-unify_boringssl_deps2
  • revert-20803-grpclb_stabilization
  • fix-kokoro-rvm-key
  • v1.27.0
  • v1.27.0-pre2
  • v1.27.0-pre1
  • v1.26.0
  • v1.26.0-pre1
  • v1.25.0
  • v1.25.0-pre1
  • v1.24.3
  • v1.24.2
  • v1.24.1
  • v1.23.1
  • v1.24.0
  • v1.24.0-pre2
  • v1.24.0-pre1
  • v1.22.1
  • v1.23.0
  • v1.23.0-pre1
  • v1.22.0
  • v1.22.0-pre1
  • v1.21.4
41 results

channel_connectivity.c

Blame
  • channel_connectivity.c 6.53 KiB
    /*
     *
     * Copyright 2015, Google Inc.
     * All rights reserved.
     *
     * Redistribution and use in source and binary forms, with or without
     * modification, are permitted provided that the following conditions are
     * met:
     *
     *     * Redistributions of source code must retain the above copyright
     * notice, this list of conditions and the following disclaimer.
     *     * Redistributions in binary form must reproduce the above
     * copyright notice, this list of conditions and the following disclaimer
     * in the documentation and/or other materials provided with the
     * distribution.
     *     * Neither the name of Google Inc. nor the names of its
     * contributors may be used to endorse or promote products derived from
     * this software without specific prior written permission.
     *
     * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS
     * "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT
     * LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR
     * A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT
     * OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL,
     * SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT
     * LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE,
     * DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY
     * THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT
     * (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE
     * OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
     *
     */
    
    #include "src/core/surface/channel.h"
    
    #include <grpc/support/alloc.h>
    #include <grpc/support/log.h>
    
    #include "src/core/channel/client_channel.h"
    #include "src/core/iomgr/alarm.h"
    #include "src/core/surface/completion_queue.h"
    
    grpc_connectivity_state
    grpc_channel_check_connectivity_state (grpc_channel * channel, int try_to_connect)
    {
      /* forward through to the underlying client channel */
      grpc_channel_element *client_channel_elem = grpc_channel_stack_last_element (grpc_channel_get_channel_stack (channel));
      grpc_exec_ctx exec_ctx = GRPC_EXEC_CTX_INIT;
      grpc_connectivity_state state;
      if (client_channel_elem->filter != &grpc_client_channel_filter)
        {
          gpr_log (GPR_ERROR, "grpc_channel_check_connectivity_state called on something that is " "not a client channel, but '%s'", client_channel_elem->filter->name);
          return GRPC_CHANNEL_FATAL_FAILURE;
        }
      state = grpc_client_channel_check_connectivity_state (client_channel_elem, try_to_connect, &closure_list);
      grpc_exec_ctx_finish (&exec_ctx);
      return state;
    }
    
    typedef enum
    {
      WAITING,
      CALLING_BACK,
      CALLING_BACK_AND_FINISHED,
      CALLED_BACK
    } callback_phase;
    
    typedef struct
    {
      gpr_mu mu;
      callback_phase phase;
      int success;
      int removed;
      grpc_closure on_complete;
      grpc_alarm alarm;
      grpc_connectivity_state state;
      grpc_completion_queue *cq;
      grpc_cq_completion completion_storage;
      grpc_channel *channel;
      void *tag;
    } state_watcher;
    
    static void
    delete_state_watcher (grpc_exec_ctx * exec_ctx, state_watcher * w)
    {
      GRPC_CHANNEL_INTERNAL_UNREF (exec_ctx, w->channel, "watch_connectivity");
      gpr_mu_destroy (&w->mu);
      gpr_free (w);
    }
    
    static void
    finished_completion (grpc_exec_ctx * exec_ctx, void *pw, grpc_cq_completion * ignored)
    {
      int delete = 0;
      state_watcher *w = pw;
      gpr_mu_lock (&w->mu);
      switch (w->phase)
        {
        case WAITING:
        case CALLED_BACK:
          gpr_log (GPR_ERROR, "should never reach here");
          abort ();
          break;
        case CALLING_BACK:
          w->phase = CALLED_BACK;
          break;
        case CALLING_BACK_AND_FINISHED:
          delete = 1;
          break;
        }
      gpr_mu_unlock (&w->mu);
    
      if (delete)
        {
          delete_state_watcher (exec_ctx, w);
        }
    }
    
    static void
    partly_done (grpc_exec_ctx * exec_ctx, state_watcher * w, int due_to_completion)
    {
      int delete = 0;
      grpc_channel_element *client_channel_elem = NULL;
    
      gpr_mu_lock (&w->mu);
      if (w->removed == 0)
        {
          w->removed = 1;
          client_channel_elem = grpc_channel_stack_last_element (grpc_channel_get_channel_stack (w->channel));
          grpc_client_channel_del_interested_party (client_channel_elem, grpc_cq_pollset (exec_ctx, w->cq));
        }
      gpr_mu_unlock (&w->mu);
      if (due_to_completion)
        {
          gpr_mu_lock (&w->mu);
          w->success = 1;
          gpr_mu_unlock (&w->mu);
          grpc_alarm_cancel (exec_ctx, &w->alarm);
        }
    
      gpr_mu_lock (&w->mu);
      switch (w->phase)
        {
        case WAITING:
          w->phase = CALLING_BACK;
          grpc_cq_end_op (exec_ctx, w->cq, w->tag, w->success, finished_completion, w, &w->completion_storage);
          break;
        case CALLING_BACK:
          w->phase = CALLING_BACK_AND_FINISHED;
          break;
        case CALLING_BACK_AND_FINISHED:
          gpr_log (GPR_ERROR, "should never reach here");
          abort ();
          break;
        case CALLED_BACK:
          delete = 1;
          break;
        }
      gpr_mu_unlock (&w->mu);
    
      if (delete)
        {
          delete_state_watcher (exec_ctx, w);
        }
    }
    
    static void
    watch_complete (grpc_exec_ctx * exec_ctx, void *pw, int success)
    {
      partly_done (exec_ctx, pw, 1);
    }
    
    static void
    timeout_complete (grpc_exec_ctx * exec_ctx, void *pw, int success)
    {
      partly_done (exec_ctx, pw, 0);
    }
    
    void
    grpc_channel_watch_connectivity_state (grpc_channel * channel, grpc_connectivity_state last_observed_state, gpr_timespec deadline, grpc_completion_queue * cq, void *tag)
    {
      grpc_channel_element *client_channel_elem = grpc_channel_stack_last_element (grpc_channel_get_channel_stack (channel));
      grpc_exec_ctx exec_ctx = GRPC_EXEC_CTX_INIT;
      state_watcher *w = gpr_malloc (sizeof (*w));
    
      grpc_cq_begin_op (cq);
    
      gpr_mu_init (&w->mu);
      grpc_closure_init (&w->on_complete, watch_complete, w);
      w->phase = WAITING;
      w->state = last_observed_state;
      w->success = 0;
      w->removed = 0;
      w->cq = cq;
      w->tag = tag;
      w->channel = channel;
    
      grpc_alarm_init (&w->alarm, gpr_convert_clock_type (deadline, GPR_CLOCK_MONOTONIC), timeout_complete, w, gpr_now (GPR_CLOCK_MONOTONIC), &closure_list);
    
      if (client_channel_elem->filter != &grpc_client_channel_filter)
        {
          gpr_log (GPR_ERROR, "grpc_channel_watch_connectivity_state called on something that is " "not a client channel, but '%s'", client_channel_elem->filter->name);
          grpc_closure_list_add (&closure_list, &w->on_complete, 1);
        }
      else
        {
          GRPC_CHANNEL_INTERNAL_REF (channel, "watch_connectivity");
          grpc_client_channel_add_interested_party (client_channel_elem, grpc_cq_pollset (cq), &closure_list);
          grpc_client_channel_watch_connectivity_state (client_channel_elem, &w->state, &w->on_complete, &closure_list);
        }
    
      grpc_exec_ctx_finish (&exec_ctx);
    }