--- /dev/null
+/*
+ *
+ * Copyright 2015 gRPC authors.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ *
+ */
+
+/** Round Robin Policy.
+ *
+ * Before every pick, the \a get_next_ready_subchannel_index_locked function
+ * returns the p->subchannel_list->subchannels index for next subchannel,
+ * respecting the relative order of the addresses provided upon creation or
+ * updates. Note however that updates will start picking from the beginning of
+ * the updated list. */
+
+#include <grpc/support/port_platform.h>
+
+#include <stdlib.h>
+#include <string.h>
+
+#include <grpc/support/alloc.h>
+
+#include "src/core/ext/filters/client_channel/lb_policy/subchannel_list.h"
+#include "src/core/ext/filters/client_channel/lb_policy_registry.h"
+#include "src/core/ext/filters/client_channel/subchannel.h"
+#include "src/core/lib/channel/channel_args.h"
+#include "src/core/lib/debug/trace.h"
+#include "src/core/lib/gprpp/ref_counted_ptr.h"
+#include "src/core/lib/gprpp/sync.h"
+#include "src/core/lib/iomgr/sockaddr_utils.h"
+#include "src/core/lib/transport/connectivity_state.h"
+#include "src/core/lib/transport/static_metadata.h"
+
+namespace grpc_core {
+
+TraceFlag grpc_lb_round_robin_trace(false, "round_robin");
+
+namespace {
+
+//
+// round_robin LB policy
+//
+
+constexpr char kRoundRobin[] = "round_robin";
+
+class RoundRobin : public LoadBalancingPolicy {
+ public:
+ explicit RoundRobin(Args args);
+
+ const char* name() const override { return kRoundRobin; }
+
+ void UpdateLocked(UpdateArgs args) override;
+ void ResetBackoffLocked() override;
+
+ private:
+ ~RoundRobin();
+
+ // Forward declaration.
+ class RoundRobinSubchannelList;
+
+ // Data for a particular subchannel in a subchannel list.
+ // This subclass adds the following functionality:
+ // - Tracks the previous connectivity state of the subchannel, so that
+ // we know how many subchannels are in each state.
+ class RoundRobinSubchannelData
+ : public SubchannelData<RoundRobinSubchannelList,
+ RoundRobinSubchannelData> {
+ public:
+ RoundRobinSubchannelData(
+ SubchannelList<RoundRobinSubchannelList, RoundRobinSubchannelData>*
+ subchannel_list,
+ const ServerAddress& address,
+ RefCountedPtr<SubchannelInterface> subchannel)
+ : SubchannelData(subchannel_list, address, std::move(subchannel)) {}
+
+ grpc_connectivity_state connectivity_state() const {
+ return last_connectivity_state_;
+ }
+
+ void UpdateConnectivityStateLocked(
+ grpc_connectivity_state connectivity_state);
+
+ private:
+ void ProcessConnectivityChangeLocked(
+ grpc_connectivity_state connectivity_state) override;
+
+ grpc_connectivity_state last_connectivity_state_ = GRPC_CHANNEL_IDLE;
+ };
+
+ // A list of subchannels.
+ class RoundRobinSubchannelList
+ : public SubchannelList<RoundRobinSubchannelList,
+ RoundRobinSubchannelData> {
+ public:
+ RoundRobinSubchannelList(RoundRobin* policy, TraceFlag* tracer,
+ const ServerAddressList& addresses,
+ const grpc_channel_args& args)
+ : SubchannelList(policy, tracer, addresses,
+ policy->channel_control_helper(), args) {
+ // Need to maintain a ref to the LB policy as long as we maintain
+ // any references to subchannels, since the subchannels'
+ // pollset_sets will include the LB policy's pollset_set.
+ policy->Ref(DEBUG_LOCATION, "subchannel_list").release();
+ }
+
+ ~RoundRobinSubchannelList() {
+ RoundRobin* p = static_cast<RoundRobin*>(policy());
+ p->Unref(DEBUG_LOCATION, "subchannel_list");
+ }
+
+ // Starts watching the subchannels in this list.
+ void StartWatchingLocked();
+
+ // Updates the counters of subchannels in each state when a
+ // subchannel transitions from old_state to new_state.
+ void UpdateStateCountersLocked(grpc_connectivity_state old_state,
+ grpc_connectivity_state new_state);
+
+ // If this subchannel list is the RR policy's current subchannel
+ // list, updates the RR policy's connectivity state based on the
+ // subchannel list's state counters.
+ void MaybeUpdateRoundRobinConnectivityStateLocked();
+
+ // Updates the RR policy's overall state based on the counters of
+ // subchannels in each state.
+ void UpdateRoundRobinStateFromSubchannelStateCountsLocked();
+
+ private:
+ size_t num_ready_ = 0;
+ size_t num_connecting_ = 0;
+ size_t num_transient_failure_ = 0;
+ };
+
+ class Picker : public SubchannelPicker {
+ public:
+ Picker(RoundRobin* parent, RoundRobinSubchannelList* subchannel_list);
+
+ PickResult Pick(PickArgs args) override;
+
+ private:
+ // Using pointer value only, no ref held -- do not dereference!
+ RoundRobin* parent_;
+
+ size_t last_picked_index_;
+ InlinedVector<RefCountedPtr<SubchannelInterface>, 10> subchannels_;
+ };
+
+ void ShutdownLocked() override;
+
+ /** list of subchannels */
+ OrphanablePtr<RoundRobinSubchannelList> subchannel_list_;
+ /** Latest version of the subchannel list.
+ * Subchannel connectivity callbacks will only promote updated subchannel
+ * lists if they equal \a latest_pending_subchannel_list. In other words,
+ * racing callbacks that reference outdated subchannel lists won't perform any
+ * update. */
+ OrphanablePtr<RoundRobinSubchannelList> latest_pending_subchannel_list_;
+ /** are we shutting down? */
+ bool shutdown_ = false;
+};
+
+//
+// RoundRobin::Picker
+//
+
+RoundRobin::Picker::Picker(RoundRobin* parent,
+ RoundRobinSubchannelList* subchannel_list)
+ : parent_(parent) {
+ for (size_t i = 0; i < subchannel_list->num_subchannels(); ++i) {
+ RoundRobinSubchannelData* sd = subchannel_list->subchannel(i);
+ if (sd->connectivity_state() == GRPC_CHANNEL_READY) {
+ subchannels_.push_back(sd->subchannel()->Ref());
+ }
+ }
+ // For discussion on why we generate a random starting index for
+ // the picker, see https://github.com/grpc/grpc-go/issues/2580.
+ // TODO(roth): rand(3) is not thread-safe. This should be replaced with
+ // something better as part of https://github.com/grpc/grpc/issues/17891.
+ last_picked_index_ = rand() % subchannels_.size();
+ if (GRPC_TRACE_FLAG_ENABLED(grpc_lb_round_robin_trace)) {
+ gpr_log(GPR_INFO,
+ "[RR %p picker %p] created picker from subchannel_list=%p "
+ "with %" PRIuPTR " READY subchannels; last_picked_index_=%" PRIuPTR,
+ parent_, this, subchannel_list, subchannels_.size(),
+ last_picked_index_);
+ }
+}
+
+RoundRobin::PickResult RoundRobin::Picker::Pick(PickArgs args) {
+ last_picked_index_ = (last_picked_index_ + 1) % subchannels_.size();
+ if (GRPC_TRACE_FLAG_ENABLED(grpc_lb_round_robin_trace)) {
+ gpr_log(GPR_INFO,
+ "[RR %p picker %p] returning index %" PRIuPTR ", subchannel=%p",
+ parent_, this, last_picked_index_,
+ subchannels_[last_picked_index_].get());
+ }
+ PickResult result;
+ result.type = PickResult::PICK_COMPLETE;
+ result.subchannel = subchannels_[last_picked_index_];
+ return result;
+}
+
+//
+// RoundRobin
+//
+
+RoundRobin::RoundRobin(Args args) : LoadBalancingPolicy(std::move(args)) {
+ if (GRPC_TRACE_FLAG_ENABLED(grpc_lb_round_robin_trace)) {
+ gpr_log(GPR_INFO, "[RR %p] Created", this);
+ }
+}
+
+RoundRobin::~RoundRobin() {
+ if (GRPC_TRACE_FLAG_ENABLED(grpc_lb_round_robin_trace)) {
+ gpr_log(GPR_INFO, "[RR %p] Destroying Round Robin policy", this);
+ }
+ GPR_ASSERT(subchannel_list_ == nullptr);
+ GPR_ASSERT(latest_pending_subchannel_list_ == nullptr);
+}
+
+void RoundRobin::ShutdownLocked() {
+ if (GRPC_TRACE_FLAG_ENABLED(grpc_lb_round_robin_trace)) {
+ gpr_log(GPR_INFO, "[RR %p] Shutting down", this);
+ }
+ shutdown_ = true;
+ subchannel_list_.reset();
+ latest_pending_subchannel_list_.reset();
+}
+
+void RoundRobin::ResetBackoffLocked() {
+ subchannel_list_->ResetBackoffLocked();
+ if (latest_pending_subchannel_list_ != nullptr) {
+ latest_pending_subchannel_list_->ResetBackoffLocked();
+ }
+}
+
+void RoundRobin::RoundRobinSubchannelList::StartWatchingLocked() {
+ if (num_subchannels() == 0) return;
+ // Check current state of each subchannel synchronously, since any
+ // subchannel already used by some other channel may have a non-IDLE
+ // state.
+ for (size_t i = 0; i < num_subchannels(); ++i) {
+ grpc_connectivity_state state =
+ subchannel(i)->CheckConnectivityStateLocked();
+ if (state != GRPC_CHANNEL_IDLE) {
+ subchannel(i)->UpdateConnectivityStateLocked(state);
+ }
+ }
+ // Start connectivity watch for each subchannel.
+ for (size_t i = 0; i < num_subchannels(); i++) {
+ if (subchannel(i)->subchannel() != nullptr) {
+ subchannel(i)->StartConnectivityWatchLocked();
+ subchannel(i)->subchannel()->AttemptToConnect();
+ }
+ }
+ // Now set the LB policy's state based on the subchannels' states.
+ UpdateRoundRobinStateFromSubchannelStateCountsLocked();
+}
+
+void RoundRobin::RoundRobinSubchannelList::UpdateStateCountersLocked(
+ grpc_connectivity_state old_state, grpc_connectivity_state new_state) {
+ GPR_ASSERT(old_state != GRPC_CHANNEL_SHUTDOWN);
+ GPR_ASSERT(new_state != GRPC_CHANNEL_SHUTDOWN);
+ if (old_state == GRPC_CHANNEL_READY) {
+ GPR_ASSERT(num_ready_ > 0);
+ --num_ready_;
+ } else if (old_state == GRPC_CHANNEL_CONNECTING) {
+ GPR_ASSERT(num_connecting_ > 0);
+ --num_connecting_;
+ } else if (old_state == GRPC_CHANNEL_TRANSIENT_FAILURE) {
+ GPR_ASSERT(num_transient_failure_ > 0);
+ --num_transient_failure_;
+ }
+ if (new_state == GRPC_CHANNEL_READY) {
+ ++num_ready_;
+ } else if (new_state == GRPC_CHANNEL_CONNECTING) {
+ ++num_connecting_;
+ } else if (new_state == GRPC_CHANNEL_TRANSIENT_FAILURE) {
+ ++num_transient_failure_;
+ }
+}
+
+// Sets the RR policy's connectivity state and generates a new picker based
+// on the current subchannel list.
+void RoundRobin::RoundRobinSubchannelList::
+ MaybeUpdateRoundRobinConnectivityStateLocked() {
+ RoundRobin* p = static_cast<RoundRobin*>(policy());
+ // Only set connectivity state if this is the current subchannel list.
+ if (p->subchannel_list_.get() != this) return;
+ /* In priority order. The first rule to match terminates the search (ie, if we
+ * are on rule n, all previous rules were unfulfilled).
+ *
+ * 1) RULE: ANY subchannel is READY => policy is READY.
+ * CHECK: subchannel_list->num_ready > 0.
+ *
+ * 2) RULE: ANY subchannel is CONNECTING => policy is CONNECTING.
+ * CHECK: sd->curr_connectivity_state == CONNECTING.
+ *
+ * 3) RULE: ALL subchannels are TRANSIENT_FAILURE => policy is
+ * TRANSIENT_FAILURE.
+ * CHECK: subchannel_list->num_transient_failures ==
+ * subchannel_list->num_subchannels.
+ */
+ if (num_ready_ > 0) {
+ /* 1) READY */
+ p->channel_control_helper()->UpdateState(
+ GRPC_CHANNEL_READY, UniquePtr<SubchannelPicker>(New<Picker>(p, this)));
+ } else if (num_connecting_ > 0) {
+ /* 2) CONNECTING */
+ p->channel_control_helper()->UpdateState(
+ GRPC_CHANNEL_CONNECTING, UniquePtr<SubchannelPicker>(New<QueuePicker>(
+ p->Ref(DEBUG_LOCATION, "QueuePicker"))));
+ } else if (num_transient_failure_ == num_subchannels()) {
+ /* 3) TRANSIENT_FAILURE */
+ grpc_error* error =
+ grpc_error_set_int(GRPC_ERROR_CREATE_FROM_STATIC_STRING(
+ "connections to all backends failing"),
+ GRPC_ERROR_INT_GRPC_STATUS, GRPC_STATUS_UNAVAILABLE);
+ p->channel_control_helper()->UpdateState(
+ GRPC_CHANNEL_TRANSIENT_FAILURE,
+ UniquePtr<SubchannelPicker>(New<TransientFailurePicker>(error)));
+ }
+}
+
+void RoundRobin::RoundRobinSubchannelList::
+ UpdateRoundRobinStateFromSubchannelStateCountsLocked() {
+ RoundRobin* p = static_cast<RoundRobin*>(policy());
+ if (num_ready_ > 0) {
+ if (p->subchannel_list_.get() != this) {
+ // Promote this list to p->subchannel_list_.
+ // This list must be p->latest_pending_subchannel_list_, because
+ // any previous update would have been shut down already and
+ // therefore we would not be receiving a notification for them.
+ GPR_ASSERT(p->latest_pending_subchannel_list_.get() == this);
+ GPR_ASSERT(!shutting_down());
+ if (GRPC_TRACE_FLAG_ENABLED(grpc_lb_round_robin_trace)) {
+ const size_t old_num_subchannels =
+ p->subchannel_list_ != nullptr
+ ? p->subchannel_list_->num_subchannels()
+ : 0;
+ gpr_log(GPR_INFO,
+ "[RR %p] phasing out subchannel list %p (size %" PRIuPTR
+ ") in favor of %p (size %" PRIuPTR ")",
+ p, p->subchannel_list_.get(), old_num_subchannels, this,
+ num_subchannels());
+ }
+ p->subchannel_list_ = std::move(p->latest_pending_subchannel_list_);
+ }
+ }
+ // Update the RR policy's connectivity state if needed.
+ MaybeUpdateRoundRobinConnectivityStateLocked();
+}
+
+void RoundRobin::RoundRobinSubchannelData::UpdateConnectivityStateLocked(
+ grpc_connectivity_state connectivity_state) {
+ RoundRobin* p = static_cast<RoundRobin*>(subchannel_list()->policy());
+ if (GRPC_TRACE_FLAG_ENABLED(grpc_lb_round_robin_trace)) {
+ gpr_log(
+ GPR_INFO,
+ "[RR %p] connectivity changed for subchannel %p, subchannel_list %p "
+ "(index %" PRIuPTR " of %" PRIuPTR "): prev_state=%s new_state=%s",
+ p, subchannel(), subchannel_list(), Index(),
+ subchannel_list()->num_subchannels(),
+ grpc_connectivity_state_name(last_connectivity_state_),
+ grpc_connectivity_state_name(connectivity_state));
+ }
+ subchannel_list()->UpdateStateCountersLocked(last_connectivity_state_,
+ connectivity_state);
+ last_connectivity_state_ = connectivity_state;
+}
+
+void RoundRobin::RoundRobinSubchannelData::ProcessConnectivityChangeLocked(
+ grpc_connectivity_state connectivity_state) {
+ RoundRobin* p = static_cast<RoundRobin*>(subchannel_list()->policy());
+ GPR_ASSERT(subchannel() != nullptr);
+ // If the new state is TRANSIENT_FAILURE, re-resolve.
+ // Only do this if we've started watching, not at startup time.
+ // Otherwise, if the subchannel was already in state TRANSIENT_FAILURE
+ // when the subchannel list was created, we'd wind up in a constant
+ // loop of re-resolution.
+ // Also attempt to reconnect.
+ if (connectivity_state == GRPC_CHANNEL_TRANSIENT_FAILURE) {
+ if (GRPC_TRACE_FLAG_ENABLED(grpc_lb_round_robin_trace)) {
+ gpr_log(GPR_INFO,
+ "[RR %p] Subchannel %p has gone into TRANSIENT_FAILURE. "
+ "Requesting re-resolution",
+ p, subchannel());
+ }
+ p->channel_control_helper()->RequestReresolution();
+ subchannel()->AttemptToConnect();
+ }
+ // Update state counters.
+ UpdateConnectivityStateLocked(connectivity_state);
+ // Update overall state and renew notification.
+ subchannel_list()->UpdateRoundRobinStateFromSubchannelStateCountsLocked();
+}
+
+void RoundRobin::UpdateLocked(UpdateArgs args) {
+ if (GRPC_TRACE_FLAG_ENABLED(grpc_lb_round_robin_trace)) {
+ gpr_log(GPR_INFO, "[RR %p] received update with %" PRIuPTR " addresses",
+ this, args.addresses.size());
+ }
+ // Replace latest_pending_subchannel_list_.
+ if (latest_pending_subchannel_list_ != nullptr) {
+ if (GRPC_TRACE_FLAG_ENABLED(grpc_lb_round_robin_trace)) {
+ gpr_log(GPR_INFO,
+ "[RR %p] Shutting down previous pending subchannel list %p", this,
+ latest_pending_subchannel_list_.get());
+ }
+ }
+ latest_pending_subchannel_list_ = MakeOrphanable<RoundRobinSubchannelList>(
+ this, &grpc_lb_round_robin_trace, args.addresses, *args.args);
+ if (latest_pending_subchannel_list_->num_subchannels() == 0) {
+ // If the new list is empty, immediately promote the new list to the
+ // current list and transition to TRANSIENT_FAILURE.
+ grpc_error* error =
+ grpc_error_set_int(GRPC_ERROR_CREATE_FROM_STATIC_STRING("Empty update"),
+ GRPC_ERROR_INT_GRPC_STATUS, GRPC_STATUS_UNAVAILABLE);
+ channel_control_helper()->UpdateState(
+ GRPC_CHANNEL_TRANSIENT_FAILURE,
+ UniquePtr<SubchannelPicker>(New<TransientFailurePicker>(error)));
+ subchannel_list_ = std::move(latest_pending_subchannel_list_);
+ } else if (subchannel_list_ == nullptr) {
+ // If there is no current list, immediately promote the new list to
+ // the current list and start watching it.
+ subchannel_list_ = std::move(latest_pending_subchannel_list_);
+ subchannel_list_->StartWatchingLocked();
+ } else {
+ // Start watching the pending list. It will get swapped into the
+ // current list when it reports READY.
+ latest_pending_subchannel_list_->StartWatchingLocked();
+ }
+}
+
+class ParsedRoundRobinConfig : public LoadBalancingPolicy::Config {
+ public:
+ const char* name() const override { return kRoundRobin; }
+};
+
+//
+// factory
+//
+
+class RoundRobinFactory : public LoadBalancingPolicyFactory {
+ public:
+ OrphanablePtr<LoadBalancingPolicy> CreateLoadBalancingPolicy(
+ LoadBalancingPolicy::Args args) const override {
+ return OrphanablePtr<LoadBalancingPolicy>(New<RoundRobin>(std::move(args)));
+ }
+
+ const char* name() const override { return kRoundRobin; }
+
+ RefCountedPtr<LoadBalancingPolicy::Config> ParseLoadBalancingConfig(
+ const grpc_json* json, grpc_error** error) const override {
+ if (json != nullptr) {
+ GPR_DEBUG_ASSERT(strcmp(json->key, name()) == 0);
+ }
+ return RefCountedPtr<LoadBalancingPolicy::Config>(
+ New<ParsedRoundRobinConfig>());
+ }
+};
+
+} // namespace
+
+} // namespace grpc_core
+
+void grpc_lb_policy_round_robin_init() {
+ grpc_core::LoadBalancingPolicyRegistry::Builder::
+ RegisterLoadBalancingPolicyFactory(
+ grpc_core::UniquePtr<grpc_core::LoadBalancingPolicyFactory>(
+ grpc_core::New<grpc_core::RoundRobinFactory>()));
+}
+
+void grpc_lb_policy_round_robin_shutdown() {}