Built motion from commit 6a09e18b.|2.6.11
[motion2.git] / legacy-libs / grpc-cloned / deps / grpc / src / core / ext / filters / client_channel / lb_policy / subchannel_list.h
diff --git a/legacy-libs/grpc-cloned/deps/grpc/src/core/ext/filters/client_channel/lb_policy/subchannel_list.h b/legacy-libs/grpc-cloned/deps/grpc/src/core/ext/filters/client_channel/lb_policy/subchannel_list.h
new file mode 100644 (file)
index 0000000..34cd0f5
--- /dev/null
@@ -0,0 +1,464 @@
+/*
+ *
+ * Copyright 2015 gRPC authors.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ *
+ */
+
+#ifndef GRPC_CORE_EXT_FILTERS_CLIENT_CHANNEL_LB_POLICY_SUBCHANNEL_LIST_H
+#define GRPC_CORE_EXT_FILTERS_CLIENT_CHANNEL_LB_POLICY_SUBCHANNEL_LIST_H
+
+#include <grpc/support/port_platform.h>
+
+#include <string.h>
+
+#include <grpc/support/alloc.h>
+
+#include "src/core/ext/filters/client_channel/lb_policy_registry.h"
+#include "src/core/ext/filters/client_channel/server_address.h"
+// TODO(roth): Should not need the include of subchannel.h here, since
+// that implementation should be hidden from the LB policy API.
+#include "src/core/ext/filters/client_channel/subchannel.h"
+#include "src/core/ext/filters/client_channel/subchannel_interface.h"
+#include "src/core/lib/channel/channel_args.h"
+#include "src/core/lib/debug/trace.h"
+#include "src/core/lib/gprpp/abstract.h"
+#include "src/core/lib/gprpp/inlined_vector.h"
+#include "src/core/lib/gprpp/orphanable.h"
+#include "src/core/lib/gprpp/ref_counted.h"
+#include "src/core/lib/gprpp/ref_counted_ptr.h"
+#include "src/core/lib/iomgr/closure.h"
+#include "src/core/lib/iomgr/sockaddr_utils.h"
+#include "src/core/lib/transport/connectivity_state.h"
+
+// Code for maintaining a list of subchannels within an LB policy.
+//
+// To use this, callers must create their own subclasses, like so:
+/*
+
+class MySubchannelList;  // Forward declaration.
+
+class MySubchannelData
+    : public SubchannelData<MySubchannelList, MySubchannelData> {
+ public:
+  void ProcessConnectivityChangeLocked(
+      grpc_connectivity_state connectivity_state) override {
+    // ...code to handle connectivity changes...
+  }
+};
+
+class MySubchannelList
+    : public SubchannelList<MySubchannelList, MySubchannelData> {
+};
+
+*/
+// All methods will be called from within the client_channel combiner.
+
+namespace grpc_core {
+
+// Forward declaration.
+template <typename SubchannelListType, typename SubchannelDataType>
+class SubchannelList;
+
+// Stores data for a particular subchannel in a subchannel list.
+// Callers must create a subclass that implements the
+// ProcessConnectivityChangeLocked() method.
+template <typename SubchannelListType, typename SubchannelDataType>
+class SubchannelData {
+ public:
+  // Returns a pointer to the subchannel list containing this object.
+  SubchannelListType* subchannel_list() const {
+    return static_cast<SubchannelListType*>(subchannel_list_);
+  }
+
+  // Returns the index into the subchannel list of this object.
+  size_t Index() const {
+    return static_cast<size_t>(static_cast<const SubchannelDataType*>(this) -
+                               subchannel_list_->subchannel(0));
+  }
+
+  // Returns a pointer to the subchannel.
+  SubchannelInterface* subchannel() const { return subchannel_.get(); }
+
+  // Synchronously checks the subchannel's connectivity state.
+  // Must not be called while there is a connectivity notification
+  // pending (i.e., between calling StartConnectivityWatchLocked() and
+  // calling CancelConnectivityWatchLocked()).
+  grpc_connectivity_state CheckConnectivityStateLocked() {
+    GPR_ASSERT(pending_watcher_ == nullptr);
+    connectivity_state_ = subchannel_->CheckConnectivityState();
+    return connectivity_state_;
+  }
+
+  // Resets the connection backoff.
+  // TODO(roth): This method should go away when we move the backoff
+  // code out of the subchannel and into the LB policies.
+  void ResetBackoffLocked();
+
+  // Starts watching the connectivity state of the subchannel.
+  // ProcessConnectivityChangeLocked() will be called whenever the
+  // connectivity state changes.
+  void StartConnectivityWatchLocked();
+
+  // Cancels watching the connectivity state of the subchannel.
+  void CancelConnectivityWatchLocked(const char* reason);
+
+  // Cancels any pending connectivity watch and unrefs the subchannel.
+  void ShutdownLocked();
+
+  GRPC_ABSTRACT_BASE_CLASS
+
+ protected:
+  SubchannelData(
+      SubchannelList<SubchannelListType, SubchannelDataType>* subchannel_list,
+      const ServerAddress& address,
+      RefCountedPtr<SubchannelInterface> subchannel);
+
+  virtual ~SubchannelData();
+
+  // After StartConnectivityWatchLocked() is called, this method will be
+  // invoked whenever the subchannel's connectivity state changes.
+  // To stop watching, use CancelConnectivityWatchLocked().
+  virtual void ProcessConnectivityChangeLocked(
+      grpc_connectivity_state connectivity_state) GRPC_ABSTRACT;
+
+ private:
+  // Watcher for subchannel connectivity state.
+  class Watcher
+      : public SubchannelInterface::ConnectivityStateWatcherInterface {
+   public:
+    Watcher(
+        SubchannelData<SubchannelListType, SubchannelDataType>* subchannel_data,
+        RefCountedPtr<SubchannelListType> subchannel_list)
+        : subchannel_data_(subchannel_data),
+          subchannel_list_(std::move(subchannel_list)) {}
+
+    ~Watcher() { subchannel_list_.reset(DEBUG_LOCATION, "Watcher dtor"); }
+
+    void OnConnectivityStateChange(grpc_connectivity_state new_state) override;
+
+    grpc_pollset_set* interested_parties() override {
+      return subchannel_list_->policy()->interested_parties();
+    }
+
+   private:
+    SubchannelData<SubchannelListType, SubchannelDataType>* subchannel_data_;
+    RefCountedPtr<SubchannelListType> subchannel_list_;
+  };
+
+  // Unrefs the subchannel.
+  void UnrefSubchannelLocked(const char* reason);
+
+  // Backpointer to owning subchannel list.  Not owned.
+  SubchannelList<SubchannelListType, SubchannelDataType>* subchannel_list_;
+  // The subchannel.
+  RefCountedPtr<SubchannelInterface> subchannel_;
+  // Will be non-null when the subchannel's state is being watched.
+  SubchannelInterface::ConnectivityStateWatcherInterface* pending_watcher_ =
+      nullptr;
+  // Data updated by the watcher.
+  grpc_connectivity_state connectivity_state_;
+};
+
+// A list of subchannels.
+template <typename SubchannelListType, typename SubchannelDataType>
+class SubchannelList : public InternallyRefCounted<SubchannelListType> {
+ public:
+  typedef InlinedVector<SubchannelDataType, 10> SubchannelVector;
+
+  // The number of subchannels in the list.
+  size_t num_subchannels() const { return subchannels_.size(); }
+
+  // The data for the subchannel at a particular index.
+  SubchannelDataType* subchannel(size_t index) { return &subchannels_[index]; }
+
+  // Returns true if the subchannel list is shutting down.
+  bool shutting_down() const { return shutting_down_; }
+
+  // Accessors.
+  LoadBalancingPolicy* policy() const { return policy_; }
+  TraceFlag* tracer() const { return tracer_; }
+
+  // Resets connection backoff of all subchannels.
+  // TODO(roth): We will probably need to rethink this as part of moving
+  // the backoff code out of subchannels and into LB policies.
+  void ResetBackoffLocked();
+
+  void Orphan() override {
+    ShutdownLocked();
+    InternallyRefCounted<SubchannelListType>::Unref(DEBUG_LOCATION, "shutdown");
+  }
+
+  GRPC_ABSTRACT_BASE_CLASS
+
+ protected:
+  SubchannelList(LoadBalancingPolicy* policy, TraceFlag* tracer,
+                 const ServerAddressList& addresses,
+                 LoadBalancingPolicy::ChannelControlHelper* helper,
+                 const grpc_channel_args& args);
+
+  virtual ~SubchannelList();
+
+ private:
+  // So New() can call our private ctor.
+  template <typename T, typename... Args>
+  friend T* New(Args&&... args);
+
+  // For accessing Ref() and Unref().
+  friend class SubchannelData<SubchannelListType, SubchannelDataType>;
+
+  void ShutdownLocked();
+
+  // Backpointer to owning policy.
+  LoadBalancingPolicy* policy_;
+
+  TraceFlag* tracer_;
+
+  // The list of subchannels.
+  SubchannelVector subchannels_;
+
+  // Is this list shutting down? This may be true due to the shutdown of the
+  // policy itself or because a newer update has arrived while this one hadn't
+  // finished processing.
+  bool shutting_down_ = false;
+};
+
+//
+// implementation -- no user-servicable parts below
+//
+
+//
+// SubchannelData::Watcher
+//
+
+template <typename SubchannelListType, typename SubchannelDataType>
+void SubchannelData<SubchannelListType, SubchannelDataType>::Watcher::
+    OnConnectivityStateChange(grpc_connectivity_state new_state) {
+  if (GRPC_TRACE_FLAG_ENABLED(*subchannel_list_->tracer())) {
+    gpr_log(GPR_INFO,
+            "[%s %p] subchannel list %p index %" PRIuPTR " of %" PRIuPTR
+            " (subchannel %p): connectivity changed: state=%s, "
+            "shutting_down=%d, pending_watcher=%p",
+            subchannel_list_->tracer()->name(), subchannel_list_->policy(),
+            subchannel_list_.get(), subchannel_data_->Index(),
+            subchannel_list_->num_subchannels(),
+            subchannel_data_->subchannel_.get(),
+            grpc_connectivity_state_name(new_state),
+            subchannel_list_->shutting_down(),
+            subchannel_data_->pending_watcher_);
+  }
+  if (!subchannel_list_->shutting_down() &&
+      subchannel_data_->pending_watcher_ != nullptr) {
+    subchannel_data_->connectivity_state_ = new_state;
+    // Call the subclass's ProcessConnectivityChangeLocked() method.
+    subchannel_data_->ProcessConnectivityChangeLocked(new_state);
+  }
+}
+
+//
+// SubchannelData
+//
+
+template <typename SubchannelListType, typename SubchannelDataType>
+SubchannelData<SubchannelListType, SubchannelDataType>::SubchannelData(
+    SubchannelList<SubchannelListType, SubchannelDataType>* subchannel_list,
+    const ServerAddress& address, RefCountedPtr<SubchannelInterface> subchannel)
+    : subchannel_list_(subchannel_list),
+      subchannel_(std::move(subchannel)),
+      // We assume that the current state is IDLE.  If not, we'll get a
+      // callback telling us that.
+      connectivity_state_(GRPC_CHANNEL_IDLE) {}
+
+template <typename SubchannelListType, typename SubchannelDataType>
+SubchannelData<SubchannelListType, SubchannelDataType>::~SubchannelData() {
+  GPR_ASSERT(subchannel_ == nullptr);
+}
+
+template <typename SubchannelListType, typename SubchannelDataType>
+void SubchannelData<SubchannelListType, SubchannelDataType>::
+    UnrefSubchannelLocked(const char* reason) {
+  if (subchannel_ != nullptr) {
+    if (GRPC_TRACE_FLAG_ENABLED(*subchannel_list_->tracer())) {
+      gpr_log(GPR_INFO,
+              "[%s %p] subchannel list %p index %" PRIuPTR " of %" PRIuPTR
+              " (subchannel %p): unreffing subchannel",
+              subchannel_list_->tracer()->name(), subchannel_list_->policy(),
+              subchannel_list_, Index(), subchannel_list_->num_subchannels(),
+              subchannel_.get());
+    }
+    subchannel_.reset();
+  }
+}
+
+template <typename SubchannelListType, typename SubchannelDataType>
+void SubchannelData<SubchannelListType,
+                    SubchannelDataType>::ResetBackoffLocked() {
+  if (subchannel_ != nullptr) {
+    subchannel_->ResetBackoff();
+  }
+}
+
+template <typename SubchannelListType, typename SubchannelDataType>
+void SubchannelData<SubchannelListType,
+                    SubchannelDataType>::StartConnectivityWatchLocked() {
+  if (GRPC_TRACE_FLAG_ENABLED(*subchannel_list_->tracer())) {
+    gpr_log(GPR_INFO,
+            "[%s %p] subchannel list %p index %" PRIuPTR " of %" PRIuPTR
+            " (subchannel %p): starting watch (from %s)",
+            subchannel_list_->tracer()->name(), subchannel_list_->policy(),
+            subchannel_list_, Index(), subchannel_list_->num_subchannels(),
+            subchannel_.get(),
+            grpc_connectivity_state_name(connectivity_state_));
+  }
+  GPR_ASSERT(pending_watcher_ == nullptr);
+  pending_watcher_ =
+      New<Watcher>(this, subchannel_list()->Ref(DEBUG_LOCATION, "Watcher"));
+  subchannel_->WatchConnectivityState(
+      connectivity_state_,
+      UniquePtr<SubchannelInterface::ConnectivityStateWatcherInterface>(
+          pending_watcher_));
+}
+
+template <typename SubchannelListType, typename SubchannelDataType>
+void SubchannelData<SubchannelListType, SubchannelDataType>::
+    CancelConnectivityWatchLocked(const char* reason) {
+  if (GRPC_TRACE_FLAG_ENABLED(*subchannel_list_->tracer())) {
+    gpr_log(GPR_INFO,
+            "[%s %p] subchannel list %p index %" PRIuPTR " of %" PRIuPTR
+            " (subchannel %p): canceling connectivity watch (%s)",
+            subchannel_list_->tracer()->name(), subchannel_list_->policy(),
+            subchannel_list_, Index(), subchannel_list_->num_subchannels(),
+            subchannel_.get(), reason);
+  }
+  if (pending_watcher_ != nullptr) {
+    subchannel_->CancelConnectivityStateWatch(pending_watcher_);
+    pending_watcher_ = nullptr;
+  }
+}
+
+template <typename SubchannelListType, typename SubchannelDataType>
+void SubchannelData<SubchannelListType, SubchannelDataType>::ShutdownLocked() {
+  if (pending_watcher_ != nullptr) CancelConnectivityWatchLocked("shutdown");
+  UnrefSubchannelLocked("shutdown");
+}
+
+//
+// SubchannelList
+//
+
+template <typename SubchannelListType, typename SubchannelDataType>
+SubchannelList<SubchannelListType, SubchannelDataType>::SubchannelList(
+    LoadBalancingPolicy* policy, TraceFlag* tracer,
+    const ServerAddressList& addresses,
+    LoadBalancingPolicy::ChannelControlHelper* helper,
+    const grpc_channel_args& args)
+    : InternallyRefCounted<SubchannelListType>(tracer),
+      policy_(policy),
+      tracer_(tracer) {
+  if (GRPC_TRACE_FLAG_ENABLED(*tracer_)) {
+    gpr_log(GPR_INFO,
+            "[%s %p] Creating subchannel list %p for %" PRIuPTR " subchannels",
+            tracer_->name(), policy, this, addresses.size());
+  }
+  subchannels_.reserve(addresses.size());
+  // We need to remove the LB addresses in order to be able to compare the
+  // subchannel keys of subchannels from a different batch of addresses.
+  // We remove the service config, since it will be passed into the
+  // subchannel via call context.
+  static const char* keys_to_remove[] = {GRPC_ARG_SUBCHANNEL_ADDRESS,
+                                         GRPC_ARG_SERVICE_CONFIG};
+  // Create a subchannel for each address.
+  for (size_t i = 0; i < addresses.size(); i++) {
+    // TODO(roth): we should ideally hide this from the LB policy code. In
+    // principle, if we're dealing with this special case in the client_channel
+    // code for selecting grpclb, then we should also strip out these addresses
+    // there if we're not using grpclb.
+    if (addresses[i].IsBalancer()) {
+      continue;
+    }
+    InlinedVector<grpc_arg, 3> args_to_add;
+    const size_t subchannel_address_arg_index = args_to_add.size();
+    args_to_add.emplace_back(
+        Subchannel::CreateSubchannelAddressArg(&addresses[i].address()));
+    if (addresses[i].args() != nullptr) {
+      for (size_t j = 0; j < addresses[i].args()->num_args; ++j) {
+        args_to_add.emplace_back(addresses[i].args()->args[j]);
+      }
+    }
+    grpc_channel_args* new_args = grpc_channel_args_copy_and_add_and_remove(
+        &args, keys_to_remove, GPR_ARRAY_SIZE(keys_to_remove),
+        args_to_add.data(), args_to_add.size());
+    gpr_free(args_to_add[subchannel_address_arg_index].value.string);
+    RefCountedPtr<SubchannelInterface> subchannel =
+        helper->CreateSubchannel(*new_args);
+    grpc_channel_args_destroy(new_args);
+    if (subchannel == nullptr) {
+      // Subchannel could not be created.
+      if (GRPC_TRACE_FLAG_ENABLED(*tracer_)) {
+        char* address_uri = grpc_sockaddr_to_uri(&addresses[i].address());
+        gpr_log(GPR_INFO,
+                "[%s %p] could not create subchannel for address uri %s, "
+                "ignoring",
+                tracer_->name(), policy_, address_uri);
+        gpr_free(address_uri);
+      }
+      continue;
+    }
+    if (GRPC_TRACE_FLAG_ENABLED(*tracer_)) {
+      char* address_uri = grpc_sockaddr_to_uri(&addresses[i].address());
+      gpr_log(GPR_INFO,
+              "[%s %p] subchannel list %p index %" PRIuPTR
+              ": Created subchannel %p for address uri %s",
+              tracer_->name(), policy_, this, subchannels_.size(),
+              subchannel.get(), address_uri);
+      gpr_free(address_uri);
+    }
+    subchannels_.emplace_back(this, addresses[i], std::move(subchannel));
+  }
+}
+
+template <typename SubchannelListType, typename SubchannelDataType>
+SubchannelList<SubchannelListType, SubchannelDataType>::~SubchannelList() {
+  if (GRPC_TRACE_FLAG_ENABLED(*tracer_)) {
+    gpr_log(GPR_INFO, "[%s %p] Destroying subchannel_list %p", tracer_->name(),
+            policy_, this);
+  }
+}
+
+template <typename SubchannelListType, typename SubchannelDataType>
+void SubchannelList<SubchannelListType, SubchannelDataType>::ShutdownLocked() {
+  if (GRPC_TRACE_FLAG_ENABLED(*tracer_)) {
+    gpr_log(GPR_INFO, "[%s %p] Shutting down subchannel_list %p",
+            tracer_->name(), policy_, this);
+  }
+  GPR_ASSERT(!shutting_down_);
+  shutting_down_ = true;
+  for (size_t i = 0; i < subchannels_.size(); i++) {
+    SubchannelDataType* sd = &subchannels_[i];
+    sd->ShutdownLocked();
+  }
+}
+
+template <typename SubchannelListType, typename SubchannelDataType>
+void SubchannelList<SubchannelListType,
+                    SubchannelDataType>::ResetBackoffLocked() {
+  for (size_t i = 0; i < subchannels_.size(); i++) {
+    SubchannelDataType* sd = &subchannels_[i];
+    sd->ResetBackoffLocked();
+  }
+}
+
+}  // namespace grpc_core
+
+#endif /* GRPC_CORE_EXT_FILTERS_CLIENT_CHANNEL_LB_POLICY_SUBCHANNEL_LIST_H */