Built motion from commit 6a09e18b.|2.6.11
[motion2.git] / legacy-libs / grpc / deps / grpc / src / core / ext / filters / client_channel / resolving_lb_policy.cc
diff --git a/legacy-libs/grpc/deps/grpc/src/core/ext/filters/client_channel/resolving_lb_policy.cc b/legacy-libs/grpc/deps/grpc/src/core/ext/filters/client_channel/resolving_lb_policy.cc
new file mode 100644 (file)
index 0000000..f4c0f92
--- /dev/null
@@ -0,0 +1,499 @@
+/*
+ *
+ * Copyright 2015 gRPC authors.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ *
+ */
+
+#include <grpc/support/port_platform.h>
+
+#include "src/core/ext/filters/client_channel/resolving_lb_policy.h"
+
+#include <inttypes.h>
+#include <limits.h>
+#include <stdbool.h>
+#include <stdio.h>
+#include <string.h>
+
+#include <grpc/support/alloc.h>
+#include <grpc/support/log.h>
+#include <grpc/support/string_util.h>
+#include <grpc/support/sync.h>
+
+#include "src/core/ext/filters/client_channel/backup_poller.h"
+#include "src/core/ext/filters/client_channel/http_connect_handshaker.h"
+#include "src/core/ext/filters/client_channel/lb_policy_registry.h"
+#include "src/core/ext/filters/client_channel/proxy_mapper_registry.h"
+#include "src/core/ext/filters/client_channel/resolver_registry.h"
+#include "src/core/ext/filters/client_channel/retry_throttle.h"
+#include "src/core/ext/filters/client_channel/server_address.h"
+#include "src/core/ext/filters/client_channel/service_config.h"
+#include "src/core/ext/filters/client_channel/subchannel.h"
+#include "src/core/ext/filters/deadline/deadline_filter.h"
+#include "src/core/lib/backoff/backoff.h"
+#include "src/core/lib/channel/channel_args.h"
+#include "src/core/lib/channel/connected_channel.h"
+#include "src/core/lib/channel/status_util.h"
+#include "src/core/lib/gpr/string.h"
+#include "src/core/lib/gprpp/inlined_vector.h"
+#include "src/core/lib/gprpp/manual_constructor.h"
+#include "src/core/lib/gprpp/sync.h"
+#include "src/core/lib/iomgr/combiner.h"
+#include "src/core/lib/iomgr/iomgr.h"
+#include "src/core/lib/iomgr/polling_entity.h"
+#include "src/core/lib/profiling/timers.h"
+#include "src/core/lib/slice/slice_internal.h"
+#include "src/core/lib/slice/slice_string_helpers.h"
+#include "src/core/lib/surface/channel.h"
+#include "src/core/lib/transport/connectivity_state.h"
+#include "src/core/lib/transport/error_utils.h"
+#include "src/core/lib/transport/metadata.h"
+#include "src/core/lib/transport/metadata_batch.h"
+#include "src/core/lib/transport/static_metadata.h"
+#include "src/core/lib/transport/status_metadata.h"
+
+namespace grpc_core {
+
+//
+// ResolvingLoadBalancingPolicy::ResolverResultHandler
+//
+
+class ResolvingLoadBalancingPolicy::ResolverResultHandler
+    : public Resolver::ResultHandler {
+ public:
+  explicit ResolverResultHandler(
+      RefCountedPtr<ResolvingLoadBalancingPolicy> parent)
+      : parent_(std::move(parent)) {}
+
+  ~ResolverResultHandler() {
+    if (GRPC_TRACE_FLAG_ENABLED(*(parent_->tracer_))) {
+      gpr_log(GPR_INFO, "resolving_lb=%p: resolver shutdown complete",
+              parent_.get());
+    }
+  }
+
+  void ReturnResult(Resolver::Result result) override {
+    parent_->OnResolverResultChangedLocked(std::move(result));
+  }
+
+  void ReturnError(grpc_error* error) override {
+    parent_->OnResolverError(error);
+  }
+
+ private:
+  RefCountedPtr<ResolvingLoadBalancingPolicy> parent_;
+};
+
+//
+// ResolvingLoadBalancingPolicy::ResolvingControlHelper
+//
+
+class ResolvingLoadBalancingPolicy::ResolvingControlHelper
+    : public LoadBalancingPolicy::ChannelControlHelper {
+ public:
+  explicit ResolvingControlHelper(
+      RefCountedPtr<ResolvingLoadBalancingPolicy> parent)
+      : parent_(std::move(parent)) {}
+
+  RefCountedPtr<SubchannelInterface> CreateSubchannel(
+      const grpc_channel_args& args) override {
+    if (parent_->resolver_ == nullptr) return nullptr;  // Shutting down.
+    if (!CalledByCurrentChild() && !CalledByPendingChild()) return nullptr;
+    return parent_->channel_control_helper()->CreateSubchannel(args);
+  }
+
+  void UpdateState(grpc_connectivity_state state,
+                   UniquePtr<SubchannelPicker> picker) override {
+    if (parent_->resolver_ == nullptr) return;  // Shutting down.
+    // If this request is from the pending child policy, ignore it until
+    // it reports READY, at which point we swap it into place.
+    if (CalledByPendingChild()) {
+      if (GRPC_TRACE_FLAG_ENABLED(*(parent_->tracer_))) {
+        gpr_log(GPR_INFO,
+                "resolving_lb=%p helper=%p: pending child policy %p reports "
+                "state=%s",
+                parent_.get(), this, child_,
+                grpc_connectivity_state_name(state));
+      }
+      if (state != GRPC_CHANNEL_READY) return;
+      grpc_pollset_set_del_pollset_set(
+          parent_->lb_policy_->interested_parties(),
+          parent_->interested_parties());
+      parent_->lb_policy_ = std::move(parent_->pending_lb_policy_);
+    } else if (!CalledByCurrentChild()) {
+      // This request is from an outdated child, so ignore it.
+      return;
+    }
+    parent_->channel_control_helper()->UpdateState(state, std::move(picker));
+  }
+
+  void RequestReresolution() override {
+    // If there is a pending child policy, ignore re-resolution requests
+    // from the current child policy (or any outdated child).
+    if (parent_->pending_lb_policy_ != nullptr && !CalledByPendingChild()) {
+      return;
+    }
+    if (GRPC_TRACE_FLAG_ENABLED(*(parent_->tracer_))) {
+      gpr_log(GPR_INFO, "resolving_lb=%p: started name re-resolving",
+              parent_.get());
+    }
+    if (parent_->resolver_ != nullptr) {
+      parent_->resolver_->RequestReresolutionLocked();
+    }
+  }
+
+  void AddTraceEvent(TraceSeverity severity, StringView message) override {}
+
+  void set_child(LoadBalancingPolicy* child) { child_ = child; }
+
+ private:
+  bool CalledByPendingChild() const {
+    GPR_ASSERT(child_ != nullptr);
+    return child_ == parent_->pending_lb_policy_.get();
+  }
+
+  bool CalledByCurrentChild() const {
+    GPR_ASSERT(child_ != nullptr);
+    return child_ == parent_->lb_policy_.get();
+  };
+
+  RefCountedPtr<ResolvingLoadBalancingPolicy> parent_;
+  LoadBalancingPolicy* child_ = nullptr;
+};
+
+//
+// ResolvingLoadBalancingPolicy
+//
+
+ResolvingLoadBalancingPolicy::ResolvingLoadBalancingPolicy(
+    Args args, TraceFlag* tracer, UniquePtr<char> target_uri,
+    ProcessResolverResultCallback process_resolver_result,
+    void* process_resolver_result_user_data)
+    : LoadBalancingPolicy(std::move(args)),
+      tracer_(tracer),
+      target_uri_(std::move(target_uri)),
+      process_resolver_result_(process_resolver_result),
+      process_resolver_result_user_data_(process_resolver_result_user_data) {
+  GPR_ASSERT(process_resolver_result != nullptr);
+  resolver_ = ResolverRegistry::CreateResolver(
+      target_uri_.get(), args.args, interested_parties(), combiner(),
+      UniquePtr<Resolver::ResultHandler>(New<ResolverResultHandler>(Ref())));
+  // Since the validity of args has been checked when create the channel,
+  // CreateResolver() must return a non-null result.
+  GPR_ASSERT(resolver_ != nullptr);
+  if (GRPC_TRACE_FLAG_ENABLED(*tracer_)) {
+    gpr_log(GPR_INFO, "resolving_lb=%p: starting name resolution", this);
+  }
+  channel_control_helper()->UpdateState(
+      GRPC_CHANNEL_CONNECTING,
+      UniquePtr<SubchannelPicker>(New<QueuePicker>(Ref())));
+  resolver_->StartLocked();
+}
+
+ResolvingLoadBalancingPolicy::~ResolvingLoadBalancingPolicy() {
+  GPR_ASSERT(resolver_ == nullptr);
+  GPR_ASSERT(lb_policy_ == nullptr);
+}
+
+void ResolvingLoadBalancingPolicy::ShutdownLocked() {
+  if (resolver_ != nullptr) {
+    resolver_.reset();
+    if (lb_policy_ != nullptr) {
+      if (GRPC_TRACE_FLAG_ENABLED(*tracer_)) {
+        gpr_log(GPR_INFO, "resolving_lb=%p: shutting down lb_policy=%p", this,
+                lb_policy_.get());
+      }
+      grpc_pollset_set_del_pollset_set(lb_policy_->interested_parties(),
+                                       interested_parties());
+      lb_policy_.reset();
+    }
+    if (pending_lb_policy_ != nullptr) {
+      if (GRPC_TRACE_FLAG_ENABLED(*tracer_)) {
+        gpr_log(GPR_INFO, "resolving_lb=%p: shutting down pending lb_policy=%p",
+                this, pending_lb_policy_.get());
+      }
+      grpc_pollset_set_del_pollset_set(pending_lb_policy_->interested_parties(),
+                                       interested_parties());
+      pending_lb_policy_.reset();
+    }
+  }
+}
+
+void ResolvingLoadBalancingPolicy::ExitIdleLocked() {
+  if (lb_policy_ != nullptr) {
+    lb_policy_->ExitIdleLocked();
+    if (pending_lb_policy_ != nullptr) pending_lb_policy_->ExitIdleLocked();
+  }
+}
+
+void ResolvingLoadBalancingPolicy::ResetBackoffLocked() {
+  if (resolver_ != nullptr) {
+    resolver_->ResetBackoffLocked();
+    resolver_->RequestReresolutionLocked();
+  }
+  if (lb_policy_ != nullptr) lb_policy_->ResetBackoffLocked();
+  if (pending_lb_policy_ != nullptr) pending_lb_policy_->ResetBackoffLocked();
+}
+
+void ResolvingLoadBalancingPolicy::OnResolverError(grpc_error* error) {
+  if (resolver_ == nullptr) {
+    GRPC_ERROR_UNREF(error);
+    return;
+  }
+  if (GRPC_TRACE_FLAG_ENABLED(*tracer_)) {
+    gpr_log(GPR_INFO, "resolving_lb=%p: resolver transient failure: %s", this,
+            grpc_error_string(error));
+  }
+  // If we already have an LB policy from a previous resolution
+  // result, then we continue to let it set the connectivity state.
+  // Otherwise, we go into TRANSIENT_FAILURE.
+  if (lb_policy_ == nullptr) {
+    grpc_error* state_error = GRPC_ERROR_CREATE_REFERENCING_FROM_STATIC_STRING(
+        "Resolver transient failure", &error, 1);
+    channel_control_helper()->UpdateState(
+        GRPC_CHANNEL_TRANSIENT_FAILURE,
+        UniquePtr<SubchannelPicker>(New<TransientFailurePicker>(state_error)));
+  }
+  GRPC_ERROR_UNREF(error);
+}
+
+void ResolvingLoadBalancingPolicy::CreateOrUpdateLbPolicyLocked(
+    const char* lb_policy_name,
+    RefCountedPtr<LoadBalancingPolicy::Config> lb_policy_config,
+    Resolver::Result result, TraceStringVector* trace_strings) {
+  // If the child policy name changes, we need to create a new child
+  // policy.  When this happens, we leave child_policy_ as-is and store
+  // the new child policy in pending_child_policy_.  Once the new child
+  // policy transitions into state READY, we swap it into child_policy_,
+  // replacing the original child policy.  So pending_child_policy_ is
+  // non-null only between when we apply an update that changes the child
+  // policy name and when the new child reports state READY.
+  //
+  // Updates can arrive at any point during this transition.  We always
+  // apply updates relative to the most recently created child policy,
+  // even if the most recent one is still in pending_child_policy_.  This
+  // is true both when applying the updates to an existing child policy
+  // and when determining whether we need to create a new policy.
+  //
+  // As a result of this, there are several cases to consider here:
+  //
+  // 1. We have no existing child policy (i.e., we have started up but
+  //    have not yet received a serverlist from the balancer or gone
+  //    into fallback mode; in this case, both child_policy_ and
+  //    pending_child_policy_ are null).  In this case, we create a
+  //    new child policy and store it in child_policy_.
+  //
+  // 2. We have an existing child policy and have no pending child policy
+  //    from a previous update (i.e., either there has not been a
+  //    previous update that changed the policy name, or we have already
+  //    finished swapping in the new policy; in this case, child_policy_
+  //    is non-null but pending_child_policy_ is null).  In this case:
+  //    a. If child_policy_->name() equals child_policy_name, then we
+  //       update the existing child policy.
+  //    b. If child_policy_->name() does not equal child_policy_name,
+  //       we create a new policy.  The policy will be stored in
+  //       pending_child_policy_ and will later be swapped into
+  //       child_policy_ by the helper when the new child transitions
+  //       into state READY.
+  //
+  // 3. We have an existing child policy and have a pending child policy
+  //    from a previous update (i.e., a previous update set
+  //    pending_child_policy_ as per case 2b above and that policy has
+  //    not yet transitioned into state READY and been swapped into
+  //    child_policy_; in this case, both child_policy_ and
+  //    pending_child_policy_ are non-null).  In this case:
+  //    a. If pending_child_policy_->name() equals child_policy_name,
+  //       then we update the existing pending child policy.
+  //    b. If pending_child_policy->name() does not equal
+  //       child_policy_name, then we create a new policy.  The new
+  //       policy is stored in pending_child_policy_ (replacing the one
+  //       that was there before, which will be immediately shut down)
+  //       and will later be swapped into child_policy_ by the helper
+  //       when the new child transitions into state READY.
+  const bool create_policy =
+      // case 1
+      lb_policy_ == nullptr ||
+      // case 2b
+      (pending_lb_policy_ == nullptr &&
+       strcmp(lb_policy_->name(), lb_policy_name) != 0) ||
+      // case 3b
+      (pending_lb_policy_ != nullptr &&
+       strcmp(pending_lb_policy_->name(), lb_policy_name) != 0);
+  LoadBalancingPolicy* policy_to_update = nullptr;
+  if (create_policy) {
+    // Cases 1, 2b, and 3b: create a new child policy.
+    // If lb_policy_ is null, we set it (case 1), else we set
+    // pending_lb_policy_ (cases 2b and 3b).
+    if (GRPC_TRACE_FLAG_ENABLED(*tracer_)) {
+      gpr_log(GPR_INFO, "resolving_lb=%p: Creating new %schild policy %s", this,
+              lb_policy_ == nullptr ? "" : "pending ", lb_policy_name);
+    }
+    auto& lb_policy = lb_policy_ == nullptr ? lb_policy_ : pending_lb_policy_;
+    lb_policy =
+        CreateLbPolicyLocked(lb_policy_name, *result.args, trace_strings);
+    policy_to_update = lb_policy.get();
+  } else {
+    // Cases 2a and 3a: update an existing policy.
+    // If we have a pending child policy, send the update to the pending
+    // policy (case 3a), else send it to the current policy (case 2a).
+    policy_to_update = pending_lb_policy_ != nullptr ? pending_lb_policy_.get()
+                                                     : lb_policy_.get();
+  }
+  GPR_ASSERT(policy_to_update != nullptr);
+  // Update the policy.
+  if (GRPC_TRACE_FLAG_ENABLED(*tracer_)) {
+    gpr_log(GPR_INFO, "resolving_lb=%p: Updating %schild policy %p", this,
+            policy_to_update == pending_lb_policy_.get() ? "pending " : "",
+            policy_to_update);
+  }
+  UpdateArgs update_args;
+  update_args.addresses = std::move(result.addresses);
+  update_args.config = std::move(lb_policy_config);
+  // TODO(roth): Once channel args is converted to C++, use std::move() here.
+  update_args.args = result.args;
+  result.args = nullptr;
+  policy_to_update->UpdateLocked(std::move(update_args));
+}
+
+// Creates a new LB policy.
+// Updates trace_strings to indicate what was done.
+OrphanablePtr<LoadBalancingPolicy>
+ResolvingLoadBalancingPolicy::CreateLbPolicyLocked(
+    const char* lb_policy_name, const grpc_channel_args& args,
+    TraceStringVector* trace_strings) {
+  ResolvingControlHelper* helper = New<ResolvingControlHelper>(Ref());
+  LoadBalancingPolicy::Args lb_policy_args;
+  lb_policy_args.combiner = combiner();
+  lb_policy_args.channel_control_helper =
+      UniquePtr<ChannelControlHelper>(helper);
+  lb_policy_args.args = &args;
+  OrphanablePtr<LoadBalancingPolicy> lb_policy =
+      LoadBalancingPolicyRegistry::CreateLoadBalancingPolicy(
+          lb_policy_name, std::move(lb_policy_args));
+  if (GPR_UNLIKELY(lb_policy == nullptr)) {
+    gpr_log(GPR_ERROR, "could not create LB policy \"%s\"", lb_policy_name);
+    char* str;
+    gpr_asprintf(&str, "Could not create LB policy \"%s\"", lb_policy_name);
+    trace_strings->push_back(str);
+    return nullptr;
+  }
+  helper->set_child(lb_policy.get());
+  if (GRPC_TRACE_FLAG_ENABLED(*tracer_)) {
+    gpr_log(GPR_INFO, "resolving_lb=%p: created new LB policy \"%s\" (%p)",
+            this, lb_policy_name, lb_policy.get());
+  }
+  char* str;
+  gpr_asprintf(&str, "Created new LB policy \"%s\"", lb_policy_name);
+  trace_strings->push_back(str);
+  grpc_pollset_set_add_pollset_set(lb_policy->interested_parties(),
+                                   interested_parties());
+  return lb_policy;
+}
+
+void ResolvingLoadBalancingPolicy::MaybeAddTraceMessagesForAddressChangesLocked(
+    bool resolution_contains_addresses, TraceStringVector* trace_strings) {
+  if (!resolution_contains_addresses &&
+      previous_resolution_contained_addresses_) {
+    trace_strings->push_back(gpr_strdup("Address list became empty"));
+  } else if (resolution_contains_addresses &&
+             !previous_resolution_contained_addresses_) {
+    trace_strings->push_back(gpr_strdup("Address list became non-empty"));
+  }
+  previous_resolution_contained_addresses_ = resolution_contains_addresses;
+}
+
+void ResolvingLoadBalancingPolicy::ConcatenateAndAddChannelTraceLocked(
+    TraceStringVector* trace_strings) const {
+  if (!trace_strings->empty()) {
+    gpr_strvec v;
+    gpr_strvec_init(&v);
+    gpr_strvec_add(&v, gpr_strdup("Resolution event: "));
+    bool is_first = 1;
+    for (size_t i = 0; i < trace_strings->size(); ++i) {
+      if (!is_first) gpr_strvec_add(&v, gpr_strdup(", "));
+      is_first = false;
+      gpr_strvec_add(&v, (*trace_strings)[i]);
+    }
+    size_t len = 0;
+    UniquePtr<char> message(gpr_strvec_flatten(&v, &len));
+    channel_control_helper()->AddTraceEvent(ChannelControlHelper::TRACE_INFO,
+                                            StringView(message.get()));
+    gpr_strvec_destroy(&v);
+  }
+}
+
+void ResolvingLoadBalancingPolicy::OnResolverResultChangedLocked(
+    Resolver::Result result) {
+  // Handle race conditions.
+  if (resolver_ == nullptr) return;
+  if (GRPC_TRACE_FLAG_ENABLED(*tracer_)) {
+    gpr_log(GPR_INFO, "resolving_lb=%p: got resolver result", this);
+  }
+  // We only want to trace the address resolution in the follow cases:
+  // (a) Address resolution resulted in service config change.
+  // (b) Address resolution that causes number of backends to go from
+  //     zero to non-zero.
+  // (c) Address resolution that causes number of backends to go from
+  //     non-zero to zero.
+  // (d) Address resolution that causes a new LB policy to be created.
+  //
+  // We track a list of strings to eventually be concatenated and traced.
+  TraceStringVector trace_strings;
+  const bool resolution_contains_addresses = result.addresses.size() > 0;
+  // Process the resolver result.
+  const char* lb_policy_name = nullptr;
+  RefCountedPtr<LoadBalancingPolicy::Config> lb_policy_config;
+  bool service_config_changed = false;
+  char* service_config_error_string = nullptr;
+  if (process_resolver_result_ != nullptr) {
+    grpc_error* service_config_error = GRPC_ERROR_NONE;
+    service_config_changed = process_resolver_result_(
+        process_resolver_result_user_data_, result, &lb_policy_name,
+        &lb_policy_config, &service_config_error);
+    if (service_config_error != GRPC_ERROR_NONE) {
+      service_config_error_string =
+          gpr_strdup(grpc_error_string(service_config_error));
+      if (lb_policy_name == nullptr) {
+        // Use an empty lb_policy_name as an indicator that we received an
+        // invalid service config and we don't have a fallback service config.
+        OnResolverError(service_config_error);
+      } else {
+        GRPC_ERROR_UNREF(service_config_error);
+      }
+    }
+  } else {
+    lb_policy_name = child_policy_name_.get();
+    lb_policy_config = child_lb_config_;
+  }
+  if (lb_policy_name != nullptr) {
+    // Create or update LB policy, as needed.
+    CreateOrUpdateLbPolicyLocked(lb_policy_name, lb_policy_config,
+                                 std::move(result), &trace_strings);
+  }
+  // Add channel trace event.
+  if (service_config_changed) {
+    // TODO(ncteisen): might be worth somehow including a snippet of the
+    // config in the trace, at the risk of bloating the trace logs.
+    trace_strings.push_back(gpr_strdup("Service config changed"));
+  }
+  if (service_config_error_string != nullptr) {
+    trace_strings.push_back(service_config_error_string);
+    service_config_error_string = nullptr;
+  }
+  MaybeAddTraceMessagesForAddressChangesLocked(resolution_contains_addresses,
+                                               &trace_strings);
+  ConcatenateAndAddChannelTraceLocked(&trace_strings);
+}
+
+}  // namespace grpc_core