3 * Copyright 2018 gRPC authors.
5 * Licensed under the Apache License, Version 2.0 (the "License");
6 * you may not use this file except in compliance with the License.
7 * You may obtain a copy of the License at
9 * http://www.apache.org/licenses/LICENSE-2.0
11 * Unless required by applicable law or agreed to in writing, software
12 * distributed under the License is distributed on an "AS IS" BASIS,
13 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14 * See the License for the specific language governing permissions and
15 * limitations under the License.
19 /// Implementation of the gRPC LB policy.
21 /// This policy takes as input a list of resolved addresses, which must
22 /// include at least one balancer address.
24 /// An internal channel (\a lb_channel_) is created for the addresses
25 /// from that are balancers. This channel behaves just like a regular
26 /// channel that uses pick_first to select from the list of balancer
29 /// When we get our initial update, we instantiate the internal *streaming*
30 /// call to the LB server (whichever address pick_first chose). The call
31 /// will be complete when either the balancer sends status or when we cancel
32 /// the call (e.g., because we are shutting down). In needed, we retry the
33 /// call. If we received at least one valid message from the server, a new
34 /// call attempt will be made immediately; otherwise, we apply back-off
35 /// delays between attempts.
37 /// We maintain an internal child policy (round_robin) instance for distributing
38 /// requests across backends. Whenever we receive a new serverlist from
39 /// the balancer, we update the child policy with the new list of
42 /// Once a child policy instance is in place (and getting updated as
43 /// described), calls for a pick, or a cancellation will be serviced right away
44 /// by forwarding them to the child policy instance. Any time there's no child
45 /// policy available (i.e., right after the creation of the xDS policy), pick
46 /// requests are added to a list of pending picks to be flushed and serviced
47 /// when the child policy instance becomes available.
49 /// \see https://github.com/grpc/grpc/blob/master/doc/load-balancing.md for the
50 /// high level design and details.
52 // With the addition of a libuv endpoint, sockaddr.h now includes uv.h when
53 // using that endpoint. Because of various transitive includes in uv.h,
54 // including windows.h on Windows, uv.h must be included before other system
55 // headers. Therefore, sockaddr.h must always be included first.
56 #include <grpc/support/port_platform.h>
58 #include "src/core/lib/iomgr/sockaddr.h"
59 #include "src/core/lib/iomgr/socket_utils.h"
65 #include <grpc/byte_buffer_reader.h>
66 #include <grpc/grpc.h>
67 #include <grpc/support/alloc.h>
68 #include <grpc/support/string_util.h>
69 #include <grpc/support/time.h>
71 #include "include/grpc/support/alloc.h"
72 #include "src/core/ext/filters/client_channel/client_channel.h"
73 #include "src/core/ext/filters/client_channel/lb_policy.h"
74 #include "src/core/ext/filters/client_channel/lb_policy/xds/xds.h"
75 #include "src/core/ext/filters/client_channel/lb_policy/xds/xds_channel.h"
76 #include "src/core/ext/filters/client_channel/lb_policy/xds/xds_client_stats.h"
77 #include "src/core/ext/filters/client_channel/lb_policy/xds/xds_load_balancer_api.h"
78 #include "src/core/ext/filters/client_channel/lb_policy_factory.h"
79 #include "src/core/ext/filters/client_channel/lb_policy_registry.h"
80 #include "src/core/ext/filters/client_channel/parse_address.h"
81 #include "src/core/ext/filters/client_channel/resolver/fake/fake_resolver.h"
82 #include "src/core/ext/filters/client_channel/server_address.h"
83 #include "src/core/ext/filters/client_channel/service_config.h"
84 #include "src/core/lib/backoff/backoff.h"
85 #include "src/core/lib/channel/channel_args.h"
86 #include "src/core/lib/channel/channel_stack.h"
87 #include "src/core/lib/gpr/string.h"
88 #include "src/core/lib/gprpp/manual_constructor.h"
89 #include "src/core/lib/gprpp/map.h"
90 #include "src/core/lib/gprpp/memory.h"
91 #include "src/core/lib/gprpp/orphanable.h"
92 #include "src/core/lib/gprpp/ref_counted_ptr.h"
93 #include "src/core/lib/gprpp/sync.h"
94 #include "src/core/lib/iomgr/combiner.h"
95 #include "src/core/lib/iomgr/sockaddr.h"
96 #include "src/core/lib/iomgr/sockaddr_utils.h"
97 #include "src/core/lib/iomgr/timer.h"
98 #include "src/core/lib/slice/slice_hash_table.h"
99 #include "src/core/lib/slice/slice_internal.h"
100 #include "src/core/lib/slice/slice_string_helpers.h"
101 #include "src/core/lib/surface/call.h"
102 #include "src/core/lib/surface/channel.h"
103 #include "src/core/lib/surface/channel_init.h"
104 #include "src/core/lib/transport/static_metadata.h"
106 #define GRPC_XDS_INITIAL_CONNECT_BACKOFF_SECONDS 1
107 #define GRPC_XDS_RECONNECT_BACKOFF_MULTIPLIER 1.6
108 #define GRPC_XDS_RECONNECT_MAX_BACKOFF_SECONDS 120
109 #define GRPC_XDS_RECONNECT_JITTER 0.2
110 #define GRPC_XDS_DEFAULT_FALLBACK_TIMEOUT_MS 10000
111 #define GRPC_XDS_MIN_CLIENT_LOAD_REPORTING_INTERVAL_MS 1000
112 #define GRPC_XDS_DEFAULT_LOCALITY_RETENTION_INTERVAL_MS (15 * 60 * 1000)
114 namespace grpc_core {
116 TraceFlag grpc_lb_xds_trace(false, "xds");
120 constexpr char kXds[] = "xds_experimental";
122 class ParsedXdsConfig : public LoadBalancingPolicy::Config {
124 ParsedXdsConfig(const char* balancer_name,
125 RefCountedPtr<LoadBalancingPolicy::Config> child_policy,
126 RefCountedPtr<LoadBalancingPolicy::Config> fallback_policy)
127 : balancer_name_(balancer_name),
128 child_policy_(std::move(child_policy)),
129 fallback_policy_(std::move(fallback_policy)) {}
131 const char* name() const override { return kXds; }
133 const char* balancer_name() const { return balancer_name_; };
135 RefCountedPtr<LoadBalancingPolicy::Config> child_policy() const {
136 return child_policy_;
139 RefCountedPtr<LoadBalancingPolicy::Config> fallback_policy() const {
140 return fallback_policy_;
144 const char* balancer_name_ = nullptr;
145 RefCountedPtr<LoadBalancingPolicy::Config> child_policy_;
146 RefCountedPtr<LoadBalancingPolicy::Config> fallback_policy_;
149 class XdsLb : public LoadBalancingPolicy {
151 explicit XdsLb(Args args);
153 const char* name() const override { return kXds; }
155 void UpdateLocked(UpdateArgs args) override;
156 void ResetBackoffLocked() override;
159 // Contains a channel to the LB server and all the data related to the
160 // channel. Holds a ref to the xds policy.
161 class LbChannelState : public InternallyRefCounted<LbChannelState> {
163 // An LB call wrapper that can restart a call upon failure. Holds a ref to
164 // the LB channel. The template parameter is the kind of wrapped LB call.
165 template <typename T>
166 class RetryableLbCall : public InternallyRefCounted<RetryableLbCall<T>> {
168 explicit RetryableLbCall(RefCountedPtr<LbChannelState> lb_chand);
170 void Orphan() override;
172 void OnCallFinishedLocked();
174 T* lb_calld() const { return lb_calld_.get(); }
175 LbChannelState* lb_chand() const { return lb_chand_.get(); }
178 void StartNewCallLocked();
179 void StartRetryTimerLocked();
180 static void OnRetryTimerLocked(void* arg, grpc_error* error);
182 // The wrapped LB call that talks to the LB server. It's instantiated
183 // every time we start a new call. It's null during call retry backoff.
184 OrphanablePtr<T> lb_calld_;
185 // The owing LB channel.
186 RefCountedPtr<LbChannelState> lb_chand_;
190 grpc_timer retry_timer_;
191 grpc_closure on_retry_timer_;
192 bool retry_timer_callback_pending_ = false;
194 bool shutting_down_ = false;
197 // Contains an EDS call to the LB server.
198 class EdsCallState : public InternallyRefCounted<EdsCallState> {
200 // The ctor and dtor should not be used directly.
201 explicit EdsCallState(
202 RefCountedPtr<RetryableLbCall<EdsCallState>> parent);
203 ~EdsCallState() override;
205 void Orphan() override;
207 RetryableLbCall<EdsCallState>* parent() const { return parent_.get(); }
208 LbChannelState* lb_chand() const { return parent_->lb_chand(); }
209 XdsLb* xdslb_policy() const { return lb_chand()->xdslb_policy(); }
210 bool seen_response() const { return seen_response_; }
213 static void OnResponseReceivedLocked(void* arg, grpc_error* error);
214 static void OnStatusReceivedLocked(void* arg, grpc_error* error);
216 bool IsCurrentCallOnChannel() const;
218 // The owning RetryableLbCall<>.
219 RefCountedPtr<RetryableLbCall<EdsCallState>> parent_;
220 bool seen_response_ = false;
225 // recv_initial_metadata
226 grpc_metadata_array initial_metadata_recv_;
229 grpc_byte_buffer* send_message_payload_ = nullptr;
232 grpc_byte_buffer* recv_message_payload_ = nullptr;
233 grpc_closure on_response_received_;
235 // recv_trailing_metadata
236 grpc_metadata_array trailing_metadata_recv_;
237 grpc_status_code status_code_;
238 grpc_slice status_details_;
239 grpc_closure on_status_received_;
242 // Contains an LRS call to the LB server.
243 class LrsCallState : public InternallyRefCounted<LrsCallState> {
245 // The ctor and dtor should not be used directly.
246 explicit LrsCallState(
247 RefCountedPtr<RetryableLbCall<LrsCallState>> parent);
248 ~LrsCallState() override;
250 void Orphan() override;
252 void MaybeStartReportingLocked();
254 RetryableLbCall<LrsCallState>* parent() { return parent_.get(); }
255 LbChannelState* lb_chand() const { return parent_->lb_chand(); }
256 XdsLb* xdslb_policy() const { return lb_chand()->xdslb_policy(); }
257 bool seen_response() const { return seen_response_; }
260 // Reports client-side load stats according to a fixed interval.
261 class Reporter : public InternallyRefCounted<Reporter> {
263 Reporter(RefCountedPtr<LrsCallState> parent,
264 grpc_millis report_interval)
265 : parent_(std::move(parent)), report_interval_(report_interval) {
267 &on_next_report_timer_, OnNextReportTimerLocked, this,
268 grpc_combiner_scheduler(xdslb_policy()->combiner()));
270 &on_report_done_, OnReportDoneLocked, this,
271 grpc_combiner_scheduler(xdslb_policy()->combiner()));
272 ScheduleNextReportLocked();
275 void Orphan() override;
278 void ScheduleNextReportLocked();
279 static void OnNextReportTimerLocked(void* arg, grpc_error* error);
280 void SendReportLocked();
281 static void OnReportDoneLocked(void* arg, grpc_error* error);
283 bool IsCurrentReporterOnCall() const {
284 return this == parent_->reporter_.get();
286 XdsLb* xdslb_policy() const { return parent_->xdslb_policy(); }
288 // The owning LRS call.
289 RefCountedPtr<LrsCallState> parent_;
291 // The load reporting state.
292 const grpc_millis report_interval_;
293 bool last_report_counters_were_zero_ = false;
294 bool next_report_timer_callback_pending_ = false;
295 grpc_timer next_report_timer_;
296 grpc_closure on_next_report_timer_;
297 grpc_closure on_report_done_;
300 static void OnInitialRequestSentLocked(void* arg, grpc_error* error);
301 static void OnResponseReceivedLocked(void* arg, grpc_error* error);
302 static void OnStatusReceivedLocked(void* arg, grpc_error* error);
304 bool IsCurrentCallOnChannel() const;
306 // The owning RetryableLbCall<>.
307 RefCountedPtr<RetryableLbCall<LrsCallState>> parent_;
308 bool seen_response_ = false;
313 // recv_initial_metadata
314 grpc_metadata_array initial_metadata_recv_;
317 grpc_byte_buffer* send_message_payload_ = nullptr;
318 grpc_closure on_initial_request_sent_;
321 grpc_byte_buffer* recv_message_payload_ = nullptr;
322 grpc_closure on_response_received_;
324 // recv_trailing_metadata
325 grpc_metadata_array trailing_metadata_recv_;
326 grpc_status_code status_code_;
327 grpc_slice status_details_;
328 grpc_closure on_status_received_;
330 // Load reporting state.
331 grpc_millis load_reporting_interval_ = 0;
332 OrphanablePtr<Reporter> reporter_;
335 LbChannelState(RefCountedPtr<XdsLb> xdslb_policy, const char* balancer_name,
336 const grpc_channel_args& args);
339 void Orphan() override;
341 grpc_channel* channel() const { return channel_; }
342 XdsLb* xdslb_policy() const { return xdslb_policy_.get(); }
343 EdsCallState* eds_calld() const { return eds_calld_->lb_calld(); }
344 LrsCallState* lrs_calld() const { return lrs_calld_->lb_calld(); }
346 bool IsCurrentChannel() const {
347 return this == xdslb_policy_->lb_chand_.get();
349 bool IsPendingChannel() const {
350 return this == xdslb_policy_->pending_lb_chand_.get();
352 bool HasActiveEdsCall() const { return eds_calld_->lb_calld() != nullptr; }
354 void StartConnectivityWatchLocked();
355 void CancelConnectivityWatchLocked();
356 static void OnConnectivityChangedLocked(void* arg, grpc_error* error);
359 // The owning LB policy.
360 RefCountedPtr<XdsLb> xdslb_policy_;
362 // The channel and its status.
363 grpc_channel* channel_;
364 bool shutting_down_ = false;
365 grpc_connectivity_state connectivity_ = GRPC_CHANNEL_IDLE;
366 grpc_closure on_connectivity_changed_;
368 // The retryable XDS calls to the LB server.
369 OrphanablePtr<RetryableLbCall<EdsCallState>> eds_calld_;
370 OrphanablePtr<RetryableLbCall<LrsCallState>> lrs_calld_;
373 // We need this wrapper for the following reasons:
374 // 1. To process per-locality load reporting.
375 // 2. Since pickers are UniquePtrs we use this RefCounted wrapper to control
376 // references to it by the xds picker and the locality entry.
377 class PickerWrapper : public RefCounted<PickerWrapper> {
379 PickerWrapper(UniquePtr<SubchannelPicker> picker,
380 RefCountedPtr<XdsClientStats::LocalityStats> locality_stats)
381 : picker_(std::move(picker)),
382 locality_stats_(std::move(locality_stats)) {
383 locality_stats_->RefByPicker();
385 ~PickerWrapper() { locality_stats_->UnrefByPicker(); }
387 PickResult Pick(PickArgs args);
390 static void RecordCallCompletion(
391 void* arg, grpc_error* error,
392 LoadBalancingPolicy::MetadataInterface* recv_trailing_metadata,
393 LoadBalancingPolicy::CallState* call_state);
395 UniquePtr<SubchannelPicker> picker_;
396 RefCountedPtr<XdsClientStats::LocalityStats> locality_stats_;
399 // The picker will use a stateless weighting algorithm to pick the locality to
400 // use for each request.
401 class Picker : public SubchannelPicker {
403 // Maintains a weighted list of pickers from each locality that is in ready
404 // state. The first element in the pair represents the end of a range
405 // proportional to the locality's weight. The start of the range is the
406 // previous value in the vector and is 0 for the first element.
408 InlinedVector<Pair<uint32_t, RefCountedPtr<PickerWrapper>>, 1>;
409 Picker(RefCountedPtr<XdsLb> xds_policy, PickerList pickers)
410 : xds_policy_(std::move(xds_policy)),
411 pickers_(std::move(pickers)),
412 drop_config_(xds_policy_->drop_config_) {}
414 PickResult Pick(PickArgs args) override;
417 // Calls the picker of the locality that the key falls within.
418 PickResult PickFromLocality(const uint32_t key, PickArgs args);
420 RefCountedPtr<XdsLb> xds_policy_;
422 RefCountedPtr<XdsDropConfig> drop_config_;
425 class FallbackHelper : public ChannelControlHelper {
427 explicit FallbackHelper(RefCountedPtr<XdsLb> parent)
428 : parent_(std::move(parent)) {}
430 ~FallbackHelper() { parent_.reset(DEBUG_LOCATION, "FallbackHelper"); }
432 RefCountedPtr<SubchannelInterface> CreateSubchannel(
433 const grpc_channel_args& args) override;
434 void UpdateState(grpc_connectivity_state state,
435 UniquePtr<SubchannelPicker> picker) override;
436 void RequestReresolution() override;
437 void AddTraceEvent(TraceSeverity severity, StringView message) override;
439 void set_child(LoadBalancingPolicy* child) { child_ = child; }
442 bool CalledByPendingFallback() const;
443 bool CalledByCurrentFallback() const;
445 RefCountedPtr<XdsLb> parent_;
446 LoadBalancingPolicy* child_ = nullptr;
451 class LocalityEntry : public InternallyRefCounted<LocalityEntry> {
453 LocalityEntry(RefCountedPtr<XdsLb> parent,
454 RefCountedPtr<XdsLocalityName> name);
457 void UpdateLocked(uint32_t locality_weight, ServerAddressList serverlist,
458 LoadBalancingPolicy::Config* child_policy_config,
459 const grpc_channel_args* args);
460 void ShutdownLocked();
461 void ResetBackoffLocked();
462 void DeactivateLocked();
463 void Orphan() override;
465 grpc_connectivity_state connectivity_state() const {
466 return connectivity_state_;
468 uint32_t locality_weight() const { return locality_weight_; }
469 RefCountedPtr<PickerWrapper> picker_wrapper() const {
470 return picker_wrapper_;
474 class Helper : public ChannelControlHelper {
476 explicit Helper(RefCountedPtr<LocalityEntry> entry)
477 : entry_(std::move(entry)) {}
479 ~Helper() { entry_.reset(DEBUG_LOCATION, "Helper"); }
481 RefCountedPtr<SubchannelInterface> CreateSubchannel(
482 const grpc_channel_args& args) override;
483 void UpdateState(grpc_connectivity_state state,
484 UniquePtr<SubchannelPicker> picker) override;
485 void RequestReresolution() override;
486 void AddTraceEvent(TraceSeverity severity, StringView message) override;
487 void set_child(LoadBalancingPolicy* child) { child_ = child; }
490 bool CalledByPendingChild() const;
491 bool CalledByCurrentChild() const;
493 RefCountedPtr<LocalityEntry> entry_;
494 LoadBalancingPolicy* child_ = nullptr;
497 // Methods for dealing with the child policy.
498 OrphanablePtr<LoadBalancingPolicy> CreateChildPolicyLocked(
499 const char* name, const grpc_channel_args* args);
500 grpc_channel_args* CreateChildPolicyArgsLocked(
501 const grpc_channel_args* args);
503 static void OnDelayedRemovalTimerLocked(void* arg, grpc_error* error);
505 RefCountedPtr<XdsLb> parent_;
506 RefCountedPtr<XdsLocalityName> name_;
507 OrphanablePtr<LoadBalancingPolicy> child_policy_;
508 OrphanablePtr<LoadBalancingPolicy> pending_child_policy_;
509 RefCountedPtr<PickerWrapper> picker_wrapper_;
510 grpc_connectivity_state connectivity_state_ = GRPC_CHANNEL_IDLE;
511 uint32_t locality_weight_;
512 grpc_closure on_delayed_removal_timer_;
513 grpc_timer delayed_removal_timer_;
514 bool delayed_removal_timer_callback_pending_ = false;
517 explicit LocalityMap(XdsLb* xds_policy) : xds_policy_(xds_policy) {}
519 void UpdateLocked(const XdsLocalityList& locality_list,
520 LoadBalancingPolicy::Config* child_policy_config,
521 const grpc_channel_args* args, XdsLb* parent,
522 bool is_initial_update = false);
523 void UpdateXdsPickerLocked();
524 void ShutdownLocked();
525 void ResetBackoffLocked();
529 Map<RefCountedPtr<XdsLocalityName>, OrphanablePtr<LocalityEntry>,
530 XdsLocalityName::Less>
536 void ShutdownLocked() override;
538 // Helper function used in UpdateLocked().
539 void ProcessAddressesAndChannelArgsLocked(ServerAddressList addresses,
540 const grpc_channel_args& args);
542 // Parses the xds config given the JSON node of the first child of XdsConfig.
543 // If parsing succeeds, updates \a balancer_name, and updates \a
544 // child_policy_config_ and \a fallback_policy_config_ if they are also
545 // found. Does nothing upon failure.
546 void ParseLbConfig(const ParsedXdsConfig* xds_config);
548 LbChannelState* LatestLbChannel() const {
549 return pending_lb_chand_ != nullptr ? pending_lb_chand_.get()
553 // Methods for dealing with fallback state.
554 void MaybeCancelFallbackAtStartupChecks();
555 static void OnFallbackTimerLocked(void* arg, grpc_error* error);
556 void UpdateFallbackPolicyLocked();
557 OrphanablePtr<LoadBalancingPolicy> CreateFallbackPolicyLocked(
558 const char* name, const grpc_channel_args* args);
559 void MaybeExitFallbackMode();
561 // Name of the backend server to connect to.
562 const char* server_name_ = nullptr;
564 // Name of the balancer to connect to.
565 UniquePtr<char> balancer_name_;
567 // Current channel args from the resolver.
568 grpc_channel_args* args_ = nullptr;
571 bool shutting_down_ = false;
573 // The channel for communicating with the LB server.
574 OrphanablePtr<LbChannelState> lb_chand_;
575 OrphanablePtr<LbChannelState> pending_lb_chand_;
577 // Timeout in milliseconds for the LB call. 0 means no deadline.
578 const grpc_millis lb_call_timeout_ms_;
580 // Whether the checks for fallback at startup are ALL pending. There are
581 // several cases where this can be reset:
582 // 1. The fallback timer fires, we enter fallback mode.
583 // 2. Before the fallback timer fires, the LB channel becomes
584 // TRANSIENT_FAILURE or the LB call fails, we enter fallback mode.
585 // 3. Before the fallback timer fires, if any child policy in the locality map
586 // becomes READY, we cancel the fallback timer.
587 bool fallback_at_startup_checks_pending_ = false;
588 // Timeout in milliseconds for before using fallback backend addresses.
589 // 0 means not using fallback.
590 const grpc_millis lb_fallback_timeout_ms_;
591 // The backend addresses from the resolver.
592 ServerAddressList fallback_backend_addresses_;
594 grpc_timer lb_fallback_timer_;
595 grpc_closure lb_on_fallback_;
597 // The policy to use for the fallback backends.
598 RefCountedPtr<LoadBalancingPolicy::Config> fallback_policy_config_;
599 // Non-null iff we are in fallback mode.
600 OrphanablePtr<LoadBalancingPolicy> fallback_policy_;
601 OrphanablePtr<LoadBalancingPolicy> pending_fallback_policy_;
603 // The policy to use for the backends.
604 RefCountedPtr<LoadBalancingPolicy::Config> child_policy_config_;
605 const grpc_millis locality_retention_interval_ms_;
606 // Map of policies to use in the backend
607 LocalityMap locality_map_;
608 // TODO(mhaidry) : Add support for multiple maps of localities
609 // with different priorities
610 XdsLocalityList locality_list_;
611 // TODO(mhaidry) : Add a pending locality map that may be swapped with the
612 // the current one when new localities in the pending map are ready
613 // to accept connections
615 // The config for dropping calls.
616 RefCountedPtr<XdsDropConfig> drop_config_;
618 // The stats for client-side load reporting.
619 XdsClientStats client_stats_;
623 // XdsLb::PickerWrapper::Pick
626 LoadBalancingPolicy::PickResult XdsLb::PickerWrapper::Pick(
627 LoadBalancingPolicy::PickArgs args) {
628 // Forward the pick to the picker returned from the child policy.
629 PickResult result = picker_->Pick(args);
630 if (result.type != PickResult::PICK_COMPLETE ||
631 result.subchannel == nullptr || locality_stats_ == nullptr) {
634 // Record a call started.
635 locality_stats_->AddCallStarted();
636 // Intercept the recv_trailing_metadata op to record call completion.
637 result.recv_trailing_metadata_ready = RecordCallCompletion;
638 result.recv_trailing_metadata_ready_user_data =
639 locality_stats_->Ref(DEBUG_LOCATION, "LocalityStats+call").release();
643 // Note that the following callback does not run in either the control plane
644 // combiner or the data plane combiner.
645 void XdsLb::PickerWrapper::RecordCallCompletion(
646 void* arg, grpc_error* error,
647 LoadBalancingPolicy::MetadataInterface* recv_trailing_metadata,
648 LoadBalancingPolicy::CallState* call_state) {
649 XdsClientStats::LocalityStats* locality_stats =
650 static_cast<XdsClientStats::LocalityStats*>(arg);
651 const bool call_failed = error != GRPC_ERROR_NONE;
652 locality_stats->AddCallFinished(call_failed);
653 locality_stats->Unref(DEBUG_LOCATION, "LocalityStats+call");
660 XdsLb::PickResult XdsLb::Picker::Pick(PickArgs args) {
662 const UniquePtr<char>* drop_category;
663 if (drop_config_->ShouldDrop(&drop_category)) {
664 xds_policy_->client_stats_.AddCallDropped(*drop_category);
666 result.type = PickResult::PICK_COMPLETE;
669 // Generate a random number in [0, total weight).
670 const uint32_t key = rand() % pickers_[pickers_.size() - 1].first;
671 // Forward pick to whichever locality maps to the range in which the
672 // random number falls in.
673 return PickFromLocality(key, args);
676 XdsLb::PickResult XdsLb::Picker::PickFromLocality(const uint32_t key,
679 size_t start_index = 0;
680 size_t end_index = pickers_.size() - 1;
682 while (end_index > start_index) {
683 mid = (start_index + end_index) / 2;
684 if (pickers_[mid].first > key) {
686 } else if (pickers_[mid].first < key) {
687 start_index = mid + 1;
693 if (index == 0) index = start_index;
694 GPR_ASSERT(pickers_[index].first > key);
695 return pickers_[index].second->Pick(args);
699 // XdsLb::FallbackHelper
702 bool XdsLb::FallbackHelper::CalledByPendingFallback() const {
703 GPR_ASSERT(child_ != nullptr);
704 return child_ == parent_->pending_fallback_policy_.get();
707 bool XdsLb::FallbackHelper::CalledByCurrentFallback() const {
708 GPR_ASSERT(child_ != nullptr);
709 return child_ == parent_->fallback_policy_.get();
712 RefCountedPtr<SubchannelInterface> XdsLb::FallbackHelper::CreateSubchannel(
713 const grpc_channel_args& args) {
714 if (parent_->shutting_down_ ||
715 (!CalledByPendingFallback() && !CalledByCurrentFallback())) {
718 return parent_->channel_control_helper()->CreateSubchannel(args);
721 void XdsLb::FallbackHelper::UpdateState(grpc_connectivity_state state,
722 UniquePtr<SubchannelPicker> picker) {
723 if (parent_->shutting_down_) return;
724 // If this request is from the pending fallback policy, ignore it until
725 // it reports READY, at which point we swap it into place.
726 if (CalledByPendingFallback()) {
727 if (GRPC_TRACE_FLAG_ENABLED(grpc_lb_xds_trace)) {
730 "[xdslb %p helper %p] pending fallback policy %p reports state=%s",
731 parent_.get(), this, parent_->pending_fallback_policy_.get(),
732 grpc_connectivity_state_name(state));
734 if (state != GRPC_CHANNEL_READY) return;
735 grpc_pollset_set_del_pollset_set(
736 parent_->fallback_policy_->interested_parties(),
737 parent_->interested_parties());
738 parent_->fallback_policy_ = std::move(parent_->pending_fallback_policy_);
739 } else if (!CalledByCurrentFallback()) {
740 // This request is from an outdated fallback policy, so ignore it.
743 parent_->channel_control_helper()->UpdateState(state, std::move(picker));
746 void XdsLb::FallbackHelper::RequestReresolution() {
747 if (parent_->shutting_down_) return;
748 const LoadBalancingPolicy* latest_fallback_policy =
749 parent_->pending_fallback_policy_ != nullptr
750 ? parent_->pending_fallback_policy_.get()
751 : parent_->fallback_policy_.get();
752 if (child_ != latest_fallback_policy) return;
753 if (GRPC_TRACE_FLAG_ENABLED(grpc_lb_xds_trace)) {
755 "[xdslb %p] Re-resolution requested from the fallback policy (%p).",
756 parent_.get(), child_);
758 GPR_ASSERT(parent_->lb_chand_ != nullptr);
759 parent_->channel_control_helper()->RequestReresolution();
762 void XdsLb::FallbackHelper::AddTraceEvent(TraceSeverity severity,
763 StringView message) {
764 if (parent_->shutting_down_ ||
765 (!CalledByPendingFallback() && !CalledByCurrentFallback())) {
768 parent_->channel_control_helper()->AddTraceEvent(severity, message);
772 // XdsLb::LbChannelState
775 XdsLb::LbChannelState::LbChannelState(RefCountedPtr<XdsLb> xdslb_policy,
776 const char* balancer_name,
777 const grpc_channel_args& args)
778 : InternallyRefCounted<LbChannelState>(&grpc_lb_xds_trace),
779 xdslb_policy_(std::move(xdslb_policy)) {
780 GRPC_CLOSURE_INIT(&on_connectivity_changed_, OnConnectivityChangedLocked,
781 this, grpc_combiner_scheduler(xdslb_policy_->combiner()));
782 channel_ = CreateXdsBalancerChannel(balancer_name, args);
783 GPR_ASSERT(channel_ != nullptr);
784 eds_calld_.reset(New<RetryableLbCall<EdsCallState>>(
785 Ref(DEBUG_LOCATION, "LbChannelState+eds")));
786 lrs_calld_.reset(New<RetryableLbCall<LrsCallState>>(
787 Ref(DEBUG_LOCATION, "LbChannelState+lrs")));
790 XdsLb::LbChannelState::~LbChannelState() {
791 if (GRPC_TRACE_FLAG_ENABLED(grpc_lb_xds_trace)) {
792 gpr_log(GPR_INFO, "[xdslb %p] Destroying LB channel %p", xdslb_policy(),
795 grpc_channel_destroy(channel_);
798 void XdsLb::LbChannelState::Orphan() {
799 shutting_down_ = true;
802 Unref(DEBUG_LOCATION, "LbChannelState+orphaned");
805 void XdsLb::LbChannelState::StartConnectivityWatchLocked() {
806 grpc_channel_element* client_channel_elem =
807 grpc_channel_stack_last_element(grpc_channel_get_channel_stack(channel_));
808 GPR_ASSERT(client_channel_elem->filter == &grpc_client_channel_filter);
809 // Ref held by callback.
810 Ref(DEBUG_LOCATION, "LbChannelState+start_watch").release();
811 grpc_client_channel_watch_connectivity_state(
813 grpc_polling_entity_create_from_pollset_set(
814 xdslb_policy_->interested_parties()),
815 &connectivity_, &on_connectivity_changed_, nullptr);
818 void XdsLb::LbChannelState::CancelConnectivityWatchLocked() {
819 grpc_channel_element* client_channel_elem =
820 grpc_channel_stack_last_element(grpc_channel_get_channel_stack(channel_));
821 GPR_ASSERT(client_channel_elem->filter == &grpc_client_channel_filter);
822 grpc_client_channel_watch_connectivity_state(
824 grpc_polling_entity_create_from_pollset_set(
825 xdslb_policy_->interested_parties()),
826 nullptr, &on_connectivity_changed_, nullptr);
829 void XdsLb::LbChannelState::OnConnectivityChangedLocked(void* arg,
831 LbChannelState* self = static_cast<LbChannelState*>(arg);
832 if (!self->shutting_down_ &&
833 self->xdslb_policy_->fallback_at_startup_checks_pending_) {
834 if (self->connectivity_ != GRPC_CHANNEL_TRANSIENT_FAILURE) {
835 // Not in TRANSIENT_FAILURE. Renew connectivity watch.
836 grpc_channel_element* client_channel_elem =
837 grpc_channel_stack_last_element(
838 grpc_channel_get_channel_stack(self->channel_));
839 GPR_ASSERT(client_channel_elem->filter == &grpc_client_channel_filter);
840 grpc_client_channel_watch_connectivity_state(
842 grpc_polling_entity_create_from_pollset_set(
843 self->xdslb_policy_->interested_parties()),
844 &self->connectivity_, &self->on_connectivity_changed_, nullptr);
845 return; // Early out so we don't drop the ref below.
847 // In TRANSIENT_FAILURE. Cancel the fallback timer and go into
848 // fallback mode immediately.
850 "[xdslb %p] Balancer channel in state TRANSIENT_FAILURE; "
851 "entering fallback mode",
853 self->xdslb_policy_->fallback_at_startup_checks_pending_ = false;
854 grpc_timer_cancel(&self->xdslb_policy_->lb_fallback_timer_);
855 self->xdslb_policy_->UpdateFallbackPolicyLocked();
857 // Done watching connectivity state, so drop ref.
858 self->Unref(DEBUG_LOCATION, "LbChannelState+watch_done");
862 // XdsLb::LbChannelState::RetryableLbCall<>
865 template <typename T>
866 XdsLb::LbChannelState::RetryableLbCall<T>::RetryableLbCall(
867 RefCountedPtr<LbChannelState> lb_chand)
868 : lb_chand_(std::move(lb_chand)),
871 .set_initial_backoff(GRPC_XDS_INITIAL_CONNECT_BACKOFF_SECONDS *
873 .set_multiplier(GRPC_XDS_RECONNECT_BACKOFF_MULTIPLIER)
874 .set_jitter(GRPC_XDS_RECONNECT_JITTER)
875 .set_max_backoff(GRPC_XDS_RECONNECT_MAX_BACKOFF_SECONDS * 1000)) {
877 &on_retry_timer_, OnRetryTimerLocked, this,
878 grpc_combiner_scheduler(lb_chand_->xdslb_policy()->combiner()));
879 StartNewCallLocked();
882 template <typename T>
883 void XdsLb::LbChannelState::RetryableLbCall<T>::Orphan() {
884 shutting_down_ = true;
886 if (retry_timer_callback_pending_) grpc_timer_cancel(&retry_timer_);
887 this->Unref(DEBUG_LOCATION, "RetryableLbCall+orphaned");
890 template <typename T>
891 void XdsLb::LbChannelState::RetryableLbCall<T>::OnCallFinishedLocked() {
892 const bool seen_response = lb_calld_->seen_response();
895 // If we lost connection to the LB server, reset backoff and restart the LB
898 StartNewCallLocked();
900 // If we failed to connect to the LB server, retry later.
901 StartRetryTimerLocked();
905 template <typename T>
906 void XdsLb::LbChannelState::RetryableLbCall<T>::StartNewCallLocked() {
907 if (shutting_down_) return;
908 GPR_ASSERT(lb_chand_->channel_ != nullptr);
909 GPR_ASSERT(lb_calld_ == nullptr);
910 if (GRPC_TRACE_FLAG_ENABLED(grpc_lb_xds_trace)) {
912 "[xdslb %p] Start new call from retryable call (lb_chand: %p, "
913 "retryable call: %p)",
914 lb_chand()->xdslb_policy(), lb_chand(), this);
916 lb_calld_ = MakeOrphanable<T>(
917 this->Ref(DEBUG_LOCATION, "RetryableLbCall+start_new_call"));
920 template <typename T>
921 void XdsLb::LbChannelState::RetryableLbCall<T>::StartRetryTimerLocked() {
922 if (shutting_down_) return;
923 const grpc_millis next_attempt_time = backoff_.NextAttemptTime();
924 if (GRPC_TRACE_FLAG_ENABLED(grpc_lb_xds_trace)) {
925 grpc_millis timeout = GPR_MAX(next_attempt_time - ExecCtx::Get()->Now(), 0);
927 "[xdslb %p] Failed to connect to LB server (lb_chand: %p) "
928 "retry timer will fire in %" PRId64 "ms.",
929 lb_chand()->xdslb_policy(), lb_chand(), timeout);
931 this->Ref(DEBUG_LOCATION, "RetryableLbCall+retry_timer_start").release();
932 grpc_timer_init(&retry_timer_, next_attempt_time, &on_retry_timer_);
933 retry_timer_callback_pending_ = true;
936 template <typename T>
937 void XdsLb::LbChannelState::RetryableLbCall<T>::OnRetryTimerLocked(
938 void* arg, grpc_error* error) {
939 RetryableLbCall* lb_calld = static_cast<RetryableLbCall*>(arg);
940 lb_calld->retry_timer_callback_pending_ = false;
941 if (!lb_calld->shutting_down_ && error == GRPC_ERROR_NONE) {
942 if (GRPC_TRACE_FLAG_ENABLED(grpc_lb_xds_trace)) {
944 "[xdslb %p] Retry timer fires (lb_chand: %p, retryable call: %p)",
945 lb_calld->lb_chand()->xdslb_policy(), lb_calld->lb_chand(),
948 lb_calld->StartNewCallLocked();
950 lb_calld->Unref(DEBUG_LOCATION, "RetryableLbCall+retry_timer_done");
954 // XdsLb::LbChannelState::EdsCallState
957 XdsLb::LbChannelState::EdsCallState::EdsCallState(
958 RefCountedPtr<RetryableLbCall<EdsCallState>> parent)
959 : InternallyRefCounted<EdsCallState>(&grpc_lb_xds_trace),
960 parent_(std::move(parent)) {
961 // Init the LB call. Note that the LB call will progress every time there's
962 // activity in xdslb_policy()->interested_parties(), which is comprised of
963 // the polling entities from client_channel.
964 GPR_ASSERT(xdslb_policy() != nullptr);
965 GPR_ASSERT(xdslb_policy()->server_name_ != nullptr);
966 GPR_ASSERT(xdslb_policy()->server_name_[0] != '\0');
967 const grpc_millis deadline =
968 xdslb_policy()->lb_call_timeout_ms_ == 0
969 ? GRPC_MILLIS_INF_FUTURE
970 : ExecCtx::Get()->Now() + xdslb_policy()->lb_call_timeout_ms_;
971 // Create an LB call with the specified method name.
972 lb_call_ = grpc_channel_create_pollset_set_call(
973 lb_chand()->channel_, nullptr, GRPC_PROPAGATE_DEFAULTS,
974 xdslb_policy()->interested_parties(),
975 GRPC_MDSTR_SLASH_ENVOY_DOT_API_DOT_V2_DOT_ENDPOINTDISCOVERYSERVICE_SLASH_STREAMENDPOINTS,
976 nullptr, deadline, nullptr);
977 GPR_ASSERT(lb_call_ != nullptr);
978 // Init the LB call request payload.
979 grpc_slice request_payload_slice =
980 XdsEdsRequestCreateAndEncode(xdslb_policy()->server_name_);
981 send_message_payload_ =
982 grpc_raw_byte_buffer_create(&request_payload_slice, 1);
983 grpc_slice_unref_internal(request_payload_slice);
984 // Init other data associated with the LB call.
985 grpc_metadata_array_init(&initial_metadata_recv_);
986 grpc_metadata_array_init(&trailing_metadata_recv_);
987 GRPC_CLOSURE_INIT(&on_response_received_, OnResponseReceivedLocked, this,
988 grpc_combiner_scheduler(xdslb_policy()->combiner()));
989 GRPC_CLOSURE_INIT(&on_status_received_, OnStatusReceivedLocked, this,
990 grpc_combiner_scheduler(xdslb_policy()->combiner()));
992 if (GRPC_TRACE_FLAG_ENABLED(grpc_lb_xds_trace)) {
994 "[xdslb %p] Starting EDS call (lb_chand: %p, lb_calld: %p, "
996 xdslb_policy(), lb_chand(), this, lb_call_);
999 grpc_call_error call_error;
1001 memset(ops, 0, sizeof(ops));
1002 // Op: send initial metadata.
1004 op->op = GRPC_OP_SEND_INITIAL_METADATA;
1005 op->data.send_initial_metadata.count = 0;
1007 op->reserved = nullptr;
1009 // Op: send request message.
1010 GPR_ASSERT(send_message_payload_ != nullptr);
1011 op->op = GRPC_OP_SEND_MESSAGE;
1012 op->data.send_message.send_message = send_message_payload_;
1014 op->reserved = nullptr;
1016 call_error = grpc_call_start_batch_and_execute(lb_call_, ops,
1017 (size_t)(op - ops), nullptr);
1018 GPR_ASSERT(GRPC_CALL_OK == call_error);
1019 // Op: recv initial metadata.
1021 op->op = GRPC_OP_RECV_INITIAL_METADATA;
1022 op->data.recv_initial_metadata.recv_initial_metadata =
1023 &initial_metadata_recv_;
1025 op->reserved = nullptr;
1027 // Op: recv response.
1028 op->op = GRPC_OP_RECV_MESSAGE;
1029 op->data.recv_message.recv_message = &recv_message_payload_;
1031 op->reserved = nullptr;
1033 Ref(DEBUG_LOCATION, "EDS+OnResponseReceivedLocked").release();
1034 call_error = grpc_call_start_batch_and_execute(
1035 lb_call_, ops, (size_t)(op - ops), &on_response_received_);
1036 GPR_ASSERT(GRPC_CALL_OK == call_error);
1037 // Op: recv server status.
1039 op->op = GRPC_OP_RECV_STATUS_ON_CLIENT;
1040 op->data.recv_status_on_client.trailing_metadata = &trailing_metadata_recv_;
1041 op->data.recv_status_on_client.status = &status_code_;
1042 op->data.recv_status_on_client.status_details = &status_details_;
1044 op->reserved = nullptr;
1046 // This callback signals the end of the LB call, so it relies on the initial
1047 // ref instead of a new ref. When it's invoked, it's the initial ref that is
1049 call_error = grpc_call_start_batch_and_execute(
1050 lb_call_, ops, (size_t)(op - ops), &on_status_received_);
1051 GPR_ASSERT(GRPC_CALL_OK == call_error);
1054 XdsLb::LbChannelState::EdsCallState::~EdsCallState() {
1055 grpc_metadata_array_destroy(&initial_metadata_recv_);
1056 grpc_metadata_array_destroy(&trailing_metadata_recv_);
1057 grpc_byte_buffer_destroy(send_message_payload_);
1058 grpc_byte_buffer_destroy(recv_message_payload_);
1059 grpc_slice_unref_internal(status_details_);
1060 GPR_ASSERT(lb_call_ != nullptr);
1061 grpc_call_unref(lb_call_);
1064 void XdsLb::LbChannelState::EdsCallState::Orphan() {
1065 GPR_ASSERT(lb_call_ != nullptr);
1066 // If we are here because xdslb_policy wants to cancel the call,
1067 // on_status_received_ will complete the cancellation and clean up. Otherwise,
1068 // we are here because xdslb_policy has to orphan a failed call, then the
1069 // following cancellation will be a no-op.
1070 grpc_call_cancel(lb_call_, nullptr);
1071 // Note that the initial ref is hold by on_status_received_. So the
1072 // corresponding unref happens in on_status_received_ instead of here.
1075 void XdsLb::LbChannelState::EdsCallState::OnResponseReceivedLocked(
1076 void* arg, grpc_error* error) {
1077 EdsCallState* eds_calld = static_cast<EdsCallState*>(arg);
1078 LbChannelState* lb_chand = eds_calld->lb_chand();
1079 XdsLb* xdslb_policy = eds_calld->xdslb_policy();
1080 // Empty payload means the LB call was cancelled.
1081 if (!eds_calld->IsCurrentCallOnChannel() ||
1082 eds_calld->recv_message_payload_ == nullptr) {
1083 eds_calld->Unref(DEBUG_LOCATION, "EDS+OnResponseReceivedLocked");
1086 // Read the response.
1087 grpc_byte_buffer_reader bbr;
1088 grpc_byte_buffer_reader_init(&bbr, eds_calld->recv_message_payload_);
1089 grpc_slice response_slice = grpc_byte_buffer_reader_readall(&bbr);
1090 grpc_byte_buffer_reader_destroy(&bbr);
1091 grpc_byte_buffer_destroy(eds_calld->recv_message_payload_);
1092 eds_calld->recv_message_payload_ = nullptr;
1093 // TODO(juanlishen): When we convert this to use the xds protocol, the
1094 // balancer will send us a fallback timeout such that we should go into
1095 // fallback mode if we have lost contact with the balancer after a certain
1096 // period of time. We will need to save the timeout value here, and then
1097 // when the balancer call ends, we will need to start a timer for the
1098 // specified period of time, and if the timer fires, we go into fallback
1099 // mode. We will also need to cancel the timer when we receive a serverlist
1100 // from the balancer.
1101 // This anonymous lambda is a hack to avoid the usage of goto.
1103 // Parse the response.
1105 grpc_error* parse_error =
1106 XdsEdsResponseDecodeAndParse(response_slice, &update);
1107 if (parse_error != GRPC_ERROR_NONE) {
1108 gpr_log(GPR_ERROR, "[xdslb %p] EDS response parsing failed. error=%s",
1109 xdslb_policy, grpc_error_string(parse_error));
1110 GRPC_ERROR_UNREF(parse_error);
1113 if (update.locality_list.empty() && !update.drop_all) {
1114 char* response_slice_str =
1115 grpc_dump_slice(response_slice, GPR_DUMP_ASCII | GPR_DUMP_HEX);
1117 "[xdslb %p] EDS response '%s' doesn't contain any valid locality "
1118 "but doesn't require to drop all calls. Ignoring.",
1119 xdslb_policy, response_slice_str);
1120 gpr_free(response_slice_str);
1123 eds_calld->seen_response_ = true;
1124 if (GRPC_TRACE_FLAG_ENABLED(grpc_lb_xds_trace)) {
1126 "[xdslb %p] EDS response with %" PRIuPTR
1127 " localities and %" PRIuPTR
1128 " drop categories received (drop_all=%d)",
1129 xdslb_policy, update.locality_list.size(),
1130 update.drop_config->drop_category_list().size(), update.drop_all);
1131 for (size_t i = 0; i < update.locality_list.size(); ++i) {
1132 const XdsLocalityInfo& locality = update.locality_list[i];
1134 "[xdslb %p] Locality %" PRIuPTR " %s contains %" PRIuPTR
1135 " server addresses",
1137 locality.locality_name->AsHumanReadableString(),
1138 locality.serverlist.size());
1139 for (size_t j = 0; j < locality.serverlist.size(); ++j) {
1141 grpc_sockaddr_to_string(&ipport, &locality.serverlist[j].address(),
1144 "[xdslb %p] Locality %" PRIuPTR
1145 " %s, server address %" PRIuPTR ": %s",
1147 locality.locality_name->AsHumanReadableString(), j, ipport);
1151 for (size_t i = 0; i < update.drop_config->drop_category_list().size();
1153 const XdsDropConfig::DropCategory& drop_category =
1154 update.drop_config->drop_category_list()[i];
1156 "[xdslb %p] Drop category %s has drop rate %d per million",
1157 xdslb_policy, drop_category.name.get(),
1158 drop_category.parts_per_million);
1161 // Pending LB channel receives a response; promote it.
1162 // Note that this call can't be on a discarded pending channel, because
1163 // such channels don't have any current call but we have checked this call
1164 // is a current call.
1165 if (!lb_chand->IsCurrentChannel()) {
1166 if (GRPC_TRACE_FLAG_ENABLED(grpc_lb_xds_trace)) {
1168 "[xdslb %p] Pending LB channel %p receives EDS response; "
1169 "promoting it to replace current LB channel %p",
1170 xdslb_policy, lb_chand, xdslb_policy->lb_chand_.get());
1172 // TODO(juanlishen): Maybe promote the pending LB channel when the
1173 // response results a READY locality map.
1174 xdslb_policy->lb_chand_ = std::move(xdslb_policy->pending_lb_chand_);
1176 // At this point, lb_chand must be the current LB channel, so try to start
1178 LrsCallState* lrs_calld = lb_chand->lrs_calld_->lb_calld();
1179 if (lrs_calld != nullptr) lrs_calld->MaybeStartReportingLocked();
1180 // If the balancer tells us to drop all the calls, we should exit fallback
1181 // mode immediately.
1182 if (update.drop_all) xdslb_policy->MaybeExitFallbackMode();
1183 // Update the drop config.
1184 const bool drop_config_changed =
1185 xdslb_policy->drop_config_ == nullptr ||
1186 *xdslb_policy->drop_config_ != *update.drop_config;
1187 xdslb_policy->drop_config_ = std::move(update.drop_config);
1188 // Ignore identical locality update.
1189 if (xdslb_policy->locality_list_ == update.locality_list) {
1190 if (GRPC_TRACE_FLAG_ENABLED(grpc_lb_xds_trace)) {
1192 "[xdslb %p] Incoming locality list identical to current, "
1193 "ignoring. (drop_config_changed=%d)",
1194 xdslb_policy, drop_config_changed);
1196 if (drop_config_changed) {
1197 xdslb_policy->locality_map_.UpdateXdsPickerLocked();
1201 // Update the locality list.
1202 xdslb_policy->locality_list_ = std::move(update.locality_list);
1203 // Update the locality map.
1204 xdslb_policy->locality_map_.UpdateLocked(
1205 xdslb_policy->locality_list_, xdslb_policy->child_policy_config_.get(),
1206 xdslb_policy->args_, xdslb_policy);
1208 grpc_slice_unref_internal(response_slice);
1209 if (xdslb_policy->shutting_down_) {
1210 eds_calld->Unref(DEBUG_LOCATION,
1211 "EDS+OnResponseReceivedLocked+xds_shutdown");
1214 // Keep listening for serverlist updates.
1216 memset(&op, 0, sizeof(op));
1217 op.op = GRPC_OP_RECV_MESSAGE;
1218 op.data.recv_message.recv_message = &eds_calld->recv_message_payload_;
1220 op.reserved = nullptr;
1221 GPR_ASSERT(eds_calld->lb_call_ != nullptr);
1222 // Reuse the "EDS+OnResponseReceivedLocked" ref taken in ctor.
1223 const grpc_call_error call_error = grpc_call_start_batch_and_execute(
1224 eds_calld->lb_call_, &op, 1, &eds_calld->on_response_received_);
1225 GPR_ASSERT(GRPC_CALL_OK == call_error);
1228 void XdsLb::LbChannelState::EdsCallState::OnStatusReceivedLocked(
1229 void* arg, grpc_error* error) {
1230 EdsCallState* eds_calld = static_cast<EdsCallState*>(arg);
1231 LbChannelState* lb_chand = eds_calld->lb_chand();
1232 XdsLb* xdslb_policy = eds_calld->xdslb_policy();
1233 if (GRPC_TRACE_FLAG_ENABLED(grpc_lb_xds_trace)) {
1234 char* status_details = grpc_slice_to_c_string(eds_calld->status_details_);
1236 "[xdslb %p] EDS call status received. Status = %d, details "
1237 "= '%s', (lb_chand: %p, eds_calld: %p, lb_call: %p), error '%s'",
1238 xdslb_policy, eds_calld->status_code_, status_details, lb_chand,
1239 eds_calld, eds_calld->lb_call_, grpc_error_string(error));
1240 gpr_free(status_details);
1242 // Ignore status from a stale call.
1243 if (eds_calld->IsCurrentCallOnChannel()) {
1244 // Because this call is the current one on the channel, the channel can't
1245 // have been swapped out; otherwise, the call should have been reset.
1246 GPR_ASSERT(lb_chand->IsCurrentChannel() || lb_chand->IsPendingChannel());
1247 if (lb_chand != xdslb_policy->LatestLbChannel()) {
1248 // This channel must be the current one and there is a pending one. Swap
1249 // in the pending one and we are done.
1250 if (GRPC_TRACE_FLAG_ENABLED(grpc_lb_xds_trace)) {
1252 "[xdslb %p] Promoting pending LB channel %p to replace "
1253 "current LB channel %p",
1254 xdslb_policy, lb_chand, xdslb_policy->lb_chand_.get());
1256 xdslb_policy->lb_chand_ = std::move(xdslb_policy->pending_lb_chand_);
1258 // This channel is the most recently created one. Try to restart the call
1260 eds_calld->parent_->OnCallFinishedLocked();
1261 xdslb_policy->channel_control_helper()->RequestReresolution();
1262 // If the fallback-at-startup checks are pending, go into fallback mode
1263 // immediately. This short-circuits the timeout for the
1264 // fallback-at-startup case.
1265 if (xdslb_policy->fallback_at_startup_checks_pending_) {
1267 "[xdslb %p] Balancer call finished; entering fallback mode",
1269 xdslb_policy->fallback_at_startup_checks_pending_ = false;
1270 grpc_timer_cancel(&xdslb_policy->lb_fallback_timer_);
1271 lb_chand->CancelConnectivityWatchLocked();
1272 xdslb_policy->UpdateFallbackPolicyLocked();
1276 eds_calld->Unref(DEBUG_LOCATION, "EDS+OnStatusReceivedLocked");
1279 bool XdsLb::LbChannelState::EdsCallState::IsCurrentCallOnChannel() const {
1280 // If the retryable EDS call is null (which only happens when the LB channel
1281 // is shutting down), all the EDS calls are stale.
1282 if (lb_chand()->eds_calld_ == nullptr) return false;
1283 return this == lb_chand()->eds_calld_->lb_calld();
1287 // XdsLb::LbChannelState::LrsCallState::Reporter
1290 void XdsLb::LbChannelState::LrsCallState::Reporter::Orphan() {
1291 if (next_report_timer_callback_pending_) {
1292 grpc_timer_cancel(&next_report_timer_);
1296 void XdsLb::LbChannelState::LrsCallState::Reporter::ScheduleNextReportLocked() {
1297 const grpc_millis next_report_time = ExecCtx::Get()->Now() + report_interval_;
1298 grpc_timer_init(&next_report_timer_, next_report_time,
1299 &on_next_report_timer_);
1300 next_report_timer_callback_pending_ = true;
1303 void XdsLb::LbChannelState::LrsCallState::Reporter::OnNextReportTimerLocked(
1304 void* arg, grpc_error* error) {
1305 Reporter* self = static_cast<Reporter*>(arg);
1306 self->next_report_timer_callback_pending_ = false;
1307 if (error != GRPC_ERROR_NONE || !self->IsCurrentReporterOnCall()) {
1308 self->Unref(DEBUG_LOCATION, "Reporter+timer");
1311 self->SendReportLocked();
1314 void XdsLb::LbChannelState::LrsCallState::Reporter::SendReportLocked() {
1315 // Create a request that contains the load report.
1316 grpc_slice request_payload_slice = XdsLrsRequestCreateAndEncode(
1317 xdslb_policy()->server_name_, &xdslb_policy()->client_stats_);
1318 // Skip client load report if the counters were all zero in the last
1319 // report and they are still zero in this one.
1320 const bool old_val = last_report_counters_were_zero_;
1321 last_report_counters_were_zero_ = static_cast<bool>(
1322 grpc_slice_eq(request_payload_slice, grpc_empty_slice()));
1323 if (old_val && last_report_counters_were_zero_) {
1324 ScheduleNextReportLocked();
1327 parent_->send_message_payload_ =
1328 grpc_raw_byte_buffer_create(&request_payload_slice, 1);
1329 grpc_slice_unref_internal(request_payload_slice);
1332 memset(&op, 0, sizeof(op));
1333 op.op = GRPC_OP_SEND_MESSAGE;
1334 op.data.send_message.send_message = parent_->send_message_payload_;
1335 grpc_call_error call_error = grpc_call_start_batch_and_execute(
1336 parent_->lb_call_, &op, 1, &on_report_done_);
1337 if (GPR_UNLIKELY(call_error != GRPC_CALL_OK)) {
1339 "[xdslb %p] lb_calld=%p call_error=%d sending client load report",
1340 xdslb_policy(), this, call_error);
1341 GPR_ASSERT(GRPC_CALL_OK == call_error);
1345 void XdsLb::LbChannelState::LrsCallState::Reporter::OnReportDoneLocked(
1346 void* arg, grpc_error* error) {
1347 Reporter* self = static_cast<Reporter*>(arg);
1348 grpc_byte_buffer_destroy(self->parent_->send_message_payload_);
1349 self->parent_->send_message_payload_ = nullptr;
1350 if (error != GRPC_ERROR_NONE || !self->IsCurrentReporterOnCall()) {
1351 // If this reporter is no longer the current one on the call, the reason
1352 // might be that it was orphaned for a new one due to config update.
1353 if (!self->IsCurrentReporterOnCall()) {
1354 self->parent_->MaybeStartReportingLocked();
1356 self->Unref(DEBUG_LOCATION, "Reporter+report_done");
1359 self->ScheduleNextReportLocked();
1363 // XdsLb::LbChannelState::LrsCallState
1366 XdsLb::LbChannelState::LrsCallState::LrsCallState(
1367 RefCountedPtr<RetryableLbCall<LrsCallState>> parent)
1368 : InternallyRefCounted<LrsCallState>(&grpc_lb_xds_trace),
1369 parent_(std::move(parent)) {
1370 // Init the LB call. Note that the LB call will progress every time there's
1371 // activity in xdslb_policy()->interested_parties(), which is comprised of
1372 // the polling entities from client_channel.
1373 GPR_ASSERT(xdslb_policy() != nullptr);
1374 GPR_ASSERT(xdslb_policy()->server_name_ != nullptr);
1375 GPR_ASSERT(xdslb_policy()->server_name_[0] != '\0');
1376 const grpc_millis deadline =
1377 xdslb_policy()->lb_call_timeout_ms_ == 0
1378 ? GRPC_MILLIS_INF_FUTURE
1379 : ExecCtx::Get()->Now() + xdslb_policy()->lb_call_timeout_ms_;
1380 lb_call_ = grpc_channel_create_pollset_set_call(
1381 lb_chand()->channel_, nullptr, GRPC_PROPAGATE_DEFAULTS,
1382 xdslb_policy()->interested_parties(),
1383 GRPC_MDSTR_SLASH_ENVOY_DOT_SERVICE_DOT_LOAD_STATS_DOT_V2_DOT_LOADREPORTINGSERVICE_SLASH_STREAMLOADSTATS,
1384 nullptr, deadline, nullptr);
1385 GPR_ASSERT(lb_call_ != nullptr);
1386 // Init the LB call request payload.
1387 grpc_slice request_payload_slice =
1388 XdsLrsRequestCreateAndEncode(xdslb_policy()->server_name_);
1389 send_message_payload_ =
1390 grpc_raw_byte_buffer_create(&request_payload_slice, 1);
1391 grpc_slice_unref_internal(request_payload_slice);
1392 // Init other data associated with the LRS call.
1393 grpc_metadata_array_init(&initial_metadata_recv_);
1394 grpc_metadata_array_init(&trailing_metadata_recv_);
1395 GRPC_CLOSURE_INIT(&on_initial_request_sent_, OnInitialRequestSentLocked, this,
1396 grpc_combiner_scheduler(xdslb_policy()->combiner()));
1397 GRPC_CLOSURE_INIT(&on_response_received_, OnResponseReceivedLocked, this,
1398 grpc_combiner_scheduler(xdslb_policy()->combiner()));
1399 GRPC_CLOSURE_INIT(&on_status_received_, OnStatusReceivedLocked, this,
1400 grpc_combiner_scheduler(xdslb_policy()->combiner()));
1402 if (GRPC_TRACE_FLAG_ENABLED(grpc_lb_xds_trace)) {
1404 "[xdslb %p] Starting LRS call (lb_chand: %p, lb_calld: %p, "
1406 xdslb_policy(), lb_chand(), this, lb_call_);
1409 grpc_call_error call_error;
1411 memset(ops, 0, sizeof(ops));
1412 // Op: send initial metadata.
1414 op->op = GRPC_OP_SEND_INITIAL_METADATA;
1415 op->data.send_initial_metadata.count = 0;
1417 op->reserved = nullptr;
1419 // Op: send request message.
1420 GPR_ASSERT(send_message_payload_ != nullptr);
1421 op->op = GRPC_OP_SEND_MESSAGE;
1422 op->data.send_message.send_message = send_message_payload_;
1424 op->reserved = nullptr;
1426 Ref(DEBUG_LOCATION, "LRS+OnInitialRequestSentLocked").release();
1427 call_error = grpc_call_start_batch_and_execute(
1428 lb_call_, ops, (size_t)(op - ops), &on_initial_request_sent_);
1429 GPR_ASSERT(GRPC_CALL_OK == call_error);
1430 // Op: recv initial metadata.
1432 op->op = GRPC_OP_RECV_INITIAL_METADATA;
1433 op->data.recv_initial_metadata.recv_initial_metadata =
1434 &initial_metadata_recv_;
1436 op->reserved = nullptr;
1438 // Op: recv response.
1439 op->op = GRPC_OP_RECV_MESSAGE;
1440 op->data.recv_message.recv_message = &recv_message_payload_;
1442 op->reserved = nullptr;
1444 Ref(DEBUG_LOCATION, "LRS+OnResponseReceivedLocked").release();
1445 call_error = grpc_call_start_batch_and_execute(
1446 lb_call_, ops, (size_t)(op - ops), &on_response_received_);
1447 GPR_ASSERT(GRPC_CALL_OK == call_error);
1448 // Op: recv server status.
1450 op->op = GRPC_OP_RECV_STATUS_ON_CLIENT;
1451 op->data.recv_status_on_client.trailing_metadata = &trailing_metadata_recv_;
1452 op->data.recv_status_on_client.status = &status_code_;
1453 op->data.recv_status_on_client.status_details = &status_details_;
1455 op->reserved = nullptr;
1457 // This callback signals the end of the LB call, so it relies on the initial
1458 // ref instead of a new ref. When it's invoked, it's the initial ref that is
1460 call_error = grpc_call_start_batch_and_execute(
1461 lb_call_, ops, (size_t)(op - ops), &on_status_received_);
1462 GPR_ASSERT(GRPC_CALL_OK == call_error);
1465 XdsLb::LbChannelState::LrsCallState::~LrsCallState() {
1466 grpc_metadata_array_destroy(&initial_metadata_recv_);
1467 grpc_metadata_array_destroy(&trailing_metadata_recv_);
1468 grpc_byte_buffer_destroy(send_message_payload_);
1469 grpc_byte_buffer_destroy(recv_message_payload_);
1470 grpc_slice_unref_internal(status_details_);
1471 GPR_ASSERT(lb_call_ != nullptr);
1472 grpc_call_unref(lb_call_);
1475 void XdsLb::LbChannelState::LrsCallState::Orphan() {
1477 GPR_ASSERT(lb_call_ != nullptr);
1478 // If we are here because xdslb_policy wants to cancel the call,
1479 // on_status_received_ will complete the cancellation and clean up. Otherwise,
1480 // we are here because xdslb_policy has to orphan a failed call, then the
1481 // following cancellation will be a no-op.
1482 grpc_call_cancel(lb_call_, nullptr);
1483 // Note that the initial ref is hold by on_status_received_. So the
1484 // corresponding unref happens in on_status_received_ instead of here.
1487 void XdsLb::LbChannelState::LrsCallState::MaybeStartReportingLocked() {
1488 // Don't start if this is not the current call on the current channel.
1489 if (!IsCurrentCallOnChannel() || !lb_chand()->IsCurrentChannel()) return;
1490 // Don't start again if already started.
1491 if (reporter_ != nullptr) return;
1492 // Don't start if the previous send_message op (of the initial request or the
1493 // last report of the previous reporter) hasn't completed.
1494 if (send_message_payload_ != nullptr) return;
1495 // Don't start if no LRS response has arrived.
1496 if (!seen_response()) return;
1497 // Don't start if the EDS call hasn't received any valid response. Note that
1498 // this must be the first channel because it is the current channel but its
1499 // EDS call hasn't seen any response.
1500 EdsCallState* eds_calld = lb_chand()->eds_calld_->lb_calld();
1501 if (eds_calld == nullptr || !eds_calld->seen_response()) return;
1503 lb_chand()->xdslb_policy_->client_stats_.MaybeInitLastReportTime();
1504 reporter_ = MakeOrphanable<Reporter>(
1505 Ref(DEBUG_LOCATION, "LRS+load_report+start"), load_reporting_interval_);
1508 void XdsLb::LbChannelState::LrsCallState::OnInitialRequestSentLocked(
1509 void* arg, grpc_error* error) {
1510 LrsCallState* lrs_calld = static_cast<LrsCallState*>(arg);
1511 // Clear the send_message_payload_.
1512 grpc_byte_buffer_destroy(lrs_calld->send_message_payload_);
1513 lrs_calld->send_message_payload_ = nullptr;
1514 lrs_calld->MaybeStartReportingLocked();
1515 lrs_calld->Unref(DEBUG_LOCATION, "LRS+OnInitialRequestSentLocked");
1518 void XdsLb::LbChannelState::LrsCallState::OnResponseReceivedLocked(
1519 void* arg, grpc_error* error) {
1520 LrsCallState* lrs_calld = static_cast<LrsCallState*>(arg);
1521 XdsLb* xdslb_policy = lrs_calld->xdslb_policy();
1522 // Empty payload means the LB call was cancelled.
1523 if (!lrs_calld->IsCurrentCallOnChannel() ||
1524 lrs_calld->recv_message_payload_ == nullptr) {
1525 lrs_calld->Unref(DEBUG_LOCATION, "LRS+OnResponseReceivedLocked");
1528 // Read the response.
1529 grpc_byte_buffer_reader bbr;
1530 grpc_byte_buffer_reader_init(&bbr, lrs_calld->recv_message_payload_);
1531 grpc_slice response_slice = grpc_byte_buffer_reader_readall(&bbr);
1532 grpc_byte_buffer_reader_destroy(&bbr);
1533 grpc_byte_buffer_destroy(lrs_calld->recv_message_payload_);
1534 lrs_calld->recv_message_payload_ = nullptr;
1535 // This anonymous lambda is a hack to avoid the usage of goto.
1537 // Parse the response.
1538 grpc_millis new_load_reporting_interval;
1539 grpc_error* parse_error = XdsLrsResponseDecodeAndParse(
1540 response_slice, &new_load_reporting_interval,
1541 xdslb_policy->server_name_);
1542 if (parse_error != GRPC_ERROR_NONE) {
1543 gpr_log(GPR_ERROR, "[xdslb %p] LRS response parsing failed. error=%s",
1544 xdslb_policy, grpc_error_string(parse_error));
1545 GRPC_ERROR_UNREF(parse_error);
1548 lrs_calld->seen_response_ = true;
1549 if (GRPC_TRACE_FLAG_ENABLED(grpc_lb_xds_trace)) {
1551 "[xdslb %p] LRS response received, load_report_interval=%" PRId64
1553 xdslb_policy, new_load_reporting_interval);
1555 if (new_load_reporting_interval <
1556 GRPC_XDS_MIN_CLIENT_LOAD_REPORTING_INTERVAL_MS) {
1557 new_load_reporting_interval =
1558 GRPC_XDS_MIN_CLIENT_LOAD_REPORTING_INTERVAL_MS;
1559 if (GRPC_TRACE_FLAG_ENABLED(grpc_lb_xds_trace)) {
1562 "[xdslb %p] Increased load_report_interval to minimum value %dms",
1563 xdslb_policy, GRPC_XDS_MIN_CLIENT_LOAD_REPORTING_INTERVAL_MS);
1566 // Ignore identical update.
1567 if (lrs_calld->load_reporting_interval_ == new_load_reporting_interval) {
1568 if (GRPC_TRACE_FLAG_ENABLED(grpc_lb_xds_trace)) {
1570 "[xdslb %p] Incoming LRS response identical to current, "
1576 // Stop current load reporting (if any) to adopt the new reporting interval.
1577 lrs_calld->reporter_.reset();
1578 // Record the new config.
1579 lrs_calld->load_reporting_interval_ = new_load_reporting_interval;
1580 // Try starting sending load report.
1581 lrs_calld->MaybeStartReportingLocked();
1583 grpc_slice_unref_internal(response_slice);
1584 if (xdslb_policy->shutting_down_) {
1585 lrs_calld->Unref(DEBUG_LOCATION,
1586 "LRS+OnResponseReceivedLocked+xds_shutdown");
1589 // Keep listening for LRS config updates.
1591 memset(&op, 0, sizeof(op));
1592 op.op = GRPC_OP_RECV_MESSAGE;
1593 op.data.recv_message.recv_message = &lrs_calld->recv_message_payload_;
1595 op.reserved = nullptr;
1596 GPR_ASSERT(lrs_calld->lb_call_ != nullptr);
1597 // Reuse the "OnResponseReceivedLocked" ref taken in ctor.
1598 const grpc_call_error call_error = grpc_call_start_batch_and_execute(
1599 lrs_calld->lb_call_, &op, 1, &lrs_calld->on_response_received_);
1600 GPR_ASSERT(GRPC_CALL_OK == call_error);
1603 void XdsLb::LbChannelState::LrsCallState::OnStatusReceivedLocked(
1604 void* arg, grpc_error* error) {
1605 LrsCallState* lrs_calld = static_cast<LrsCallState*>(arg);
1606 XdsLb* xdslb_policy = lrs_calld->xdslb_policy();
1607 LbChannelState* lb_chand = lrs_calld->lb_chand();
1608 GPR_ASSERT(lrs_calld->lb_call_ != nullptr);
1609 if (GRPC_TRACE_FLAG_ENABLED(grpc_lb_xds_trace)) {
1610 char* status_details = grpc_slice_to_c_string(lrs_calld->status_details_);
1612 "[xdslb %p] LRS call status received. Status = %d, details "
1613 "= '%s', (lb_chand: %p, lb_calld: %p, lb_call: %p), error '%s'",
1614 xdslb_policy, lrs_calld->status_code_, status_details, lb_chand,
1615 lrs_calld, lrs_calld->lb_call_, grpc_error_string(error));
1616 gpr_free(status_details);
1618 // Ignore status from a stale call.
1619 if (lrs_calld->IsCurrentCallOnChannel()) {
1620 // Because this call is the current one on the channel, the channel can't
1621 // have been swapped out; otherwise, the call should have been reset.
1622 GPR_ASSERT(lb_chand->IsCurrentChannel() || lb_chand->IsPendingChannel());
1623 GPR_ASSERT(!xdslb_policy->shutting_down_);
1624 if (lb_chand == xdslb_policy->LatestLbChannel()) {
1625 // This channel is the most recently created one. Try to restart the call
1627 lrs_calld->parent_->OnCallFinishedLocked();
1628 xdslb_policy->channel_control_helper()->RequestReresolution();
1631 lrs_calld->Unref(DEBUG_LOCATION, "LRS+OnStatusReceivedLocked");
1634 bool XdsLb::LbChannelState::LrsCallState::IsCurrentCallOnChannel() const {
1635 // If the retryable LRS call is null (which only happens when the LB channel
1636 // is shutting down), all the LRS calls are stale.
1637 if (lb_chand()->lrs_calld_ == nullptr) return false;
1638 return this == lb_chand()->lrs_calld_->lb_calld();
1642 // helper code for creating balancer channel
1645 // Returns the channel args for the LB channel, used to create a bidirectional
1646 // stream for the reception of load balancing updates.
1647 grpc_channel_args* BuildBalancerChannelArgs(const grpc_channel_args* args) {
1648 static const char* args_to_remove[] = {
1649 // LB policy name, since we want to use the default (pick_first) in
1651 GRPC_ARG_LB_POLICY_NAME,
1652 // The service config that contains the LB config. We don't want to
1653 // recursively use xds in the LB channel.
1654 GRPC_ARG_SERVICE_CONFIG,
1655 // The channel arg for the server URI, since that will be different for
1656 // the LB channel than for the parent channel. The client channel
1657 // factory will re-add this arg with the right value.
1658 GRPC_ARG_SERVER_URI,
1659 // The LB channel should use the authority indicated by the target
1660 // authority table (see \a ModifyXdsBalancerChannelArgs),
1661 // as opposed to the authority from the parent channel.
1662 GRPC_ARG_DEFAULT_AUTHORITY,
1663 // Just as for \a GRPC_ARG_DEFAULT_AUTHORITY, the LB channel should be
1664 // treated as a stand-alone channel and not inherit this argument from the
1665 // args of the parent channel.
1666 GRPC_SSL_TARGET_NAME_OVERRIDE_ARG,
1667 // Don't want to pass down channelz node from parent; the balancer
1668 // channel will get its own.
1669 GRPC_ARG_CHANNELZ_CHANNEL_NODE,
1671 // Channel args to add.
1672 InlinedVector<grpc_arg, 2> args_to_add;
1673 // A channel arg indicating the target is a xds load balancer.
1674 args_to_add.emplace_back(grpc_channel_arg_integer_create(
1675 const_cast<char*>(GRPC_ARG_ADDRESS_IS_XDS_LOAD_BALANCER), 1));
1676 // The parent channel's channelz uuid.
1677 channelz::ChannelNode* channelz_node = nullptr;
1678 const grpc_arg* arg =
1679 grpc_channel_args_find(args, GRPC_ARG_CHANNELZ_CHANNEL_NODE);
1680 if (arg != nullptr && arg->type == GRPC_ARG_POINTER &&
1681 arg->value.pointer.p != nullptr) {
1682 channelz_node = static_cast<channelz::ChannelNode*>(arg->value.pointer.p);
1683 args_to_add.emplace_back(
1684 channelz::MakeParentUuidArg(channelz_node->uuid()));
1686 // Construct channel args.
1687 grpc_channel_args* new_args = grpc_channel_args_copy_and_add_and_remove(
1688 args, args_to_remove, GPR_ARRAY_SIZE(args_to_remove), args_to_add.data(),
1689 args_to_add.size());
1690 // Make any necessary modifications for security.
1691 return ModifyXdsBalancerChannelArgs(new_args);
1698 XdsLb::XdsLb(Args args)
1699 : LoadBalancingPolicy(std::move(args)),
1700 lb_call_timeout_ms_(grpc_channel_args_find_integer(
1701 args.args, GRPC_ARG_GRPCLB_CALL_TIMEOUT_MS, {0, 0, INT_MAX})),
1702 lb_fallback_timeout_ms_(grpc_channel_args_find_integer(
1703 args.args, GRPC_ARG_XDS_FALLBACK_TIMEOUT_MS,
1704 {GRPC_XDS_DEFAULT_FALLBACK_TIMEOUT_MS, 0, INT_MAX})),
1705 locality_retention_interval_ms_(grpc_channel_args_find_integer(
1706 args.args, GRPC_ARG_LOCALITY_RETENTION_INTERVAL_MS,
1707 {GRPC_XDS_DEFAULT_LOCALITY_RETENTION_INTERVAL_MS, 0, INT_MAX})),
1708 locality_map_(this) {
1709 // Record server name.
1710 const grpc_arg* arg = grpc_channel_args_find(args.args, GRPC_ARG_SERVER_URI);
1711 const char* server_uri = grpc_channel_arg_get_string(arg);
1712 GPR_ASSERT(server_uri != nullptr);
1713 grpc_uri* uri = grpc_uri_parse(server_uri, true);
1714 GPR_ASSERT(uri->path[0] != '\0');
1715 server_name_ = gpr_strdup(uri->path[0] == '/' ? uri->path + 1 : uri->path);
1716 if (GRPC_TRACE_FLAG_ENABLED(grpc_lb_xds_trace)) {
1718 "[xdslb %p] Will use '%s' as the server name for LB request.", this,
1721 grpc_uri_destroy(uri);
1725 if (GRPC_TRACE_FLAG_ENABLED(grpc_lb_xds_trace)) {
1726 gpr_log(GPR_INFO, "[xdslb %p] destroying xds LB policy", this);
1728 gpr_free((void*)server_name_);
1729 grpc_channel_args_destroy(args_);
1730 locality_list_.clear();
1733 void XdsLb::ShutdownLocked() {
1734 if (GRPC_TRACE_FLAG_ENABLED(grpc_lb_xds_trace)) {
1735 gpr_log(GPR_INFO, "[xdslb %p] shutting down", this);
1737 shutting_down_ = true;
1738 if (fallback_at_startup_checks_pending_) {
1739 grpc_timer_cancel(&lb_fallback_timer_);
1741 locality_map_.ShutdownLocked();
1742 if (fallback_policy_ != nullptr) {
1743 grpc_pollset_set_del_pollset_set(fallback_policy_->interested_parties(),
1744 interested_parties());
1746 if (pending_fallback_policy_ != nullptr) {
1747 grpc_pollset_set_del_pollset_set(
1748 pending_fallback_policy_->interested_parties(), interested_parties());
1750 fallback_policy_.reset();
1751 pending_fallback_policy_.reset();
1752 // We reset the LB channels here instead of in our destructor because they
1753 // hold refs to XdsLb.
1755 pending_lb_chand_.reset();
1762 void XdsLb::ResetBackoffLocked() {
1763 if (lb_chand_ != nullptr) {
1764 grpc_channel_reset_connect_backoff(lb_chand_->channel());
1766 if (pending_lb_chand_ != nullptr) {
1767 grpc_channel_reset_connect_backoff(pending_lb_chand_->channel());
1769 locality_map_.ResetBackoffLocked();
1770 if (fallback_policy_ != nullptr) {
1771 fallback_policy_->ResetBackoffLocked();
1773 if (pending_fallback_policy_ != nullptr) {
1774 pending_fallback_policy_->ResetBackoffLocked();
1778 void XdsLb::ProcessAddressesAndChannelArgsLocked(
1779 ServerAddressList addresses, const grpc_channel_args& args) {
1780 // Update fallback address list.
1781 fallback_backend_addresses_ = std::move(addresses);
1782 // Make sure that GRPC_ARG_LB_POLICY_NAME is set in channel args,
1783 // since we use this to trigger the client_load_reporting filter.
1784 static const char* args_to_remove[] = {GRPC_ARG_LB_POLICY_NAME};
1785 grpc_arg new_arg = grpc_channel_arg_string_create(
1786 (char*)GRPC_ARG_LB_POLICY_NAME, (char*)"xds");
1787 grpc_channel_args_destroy(args_);
1788 args_ = grpc_channel_args_copy_and_add_and_remove(
1789 &args, args_to_remove, GPR_ARRAY_SIZE(args_to_remove), &new_arg, 1);
1790 // Construct args for balancer channel.
1791 grpc_channel_args* lb_channel_args = BuildBalancerChannelArgs(&args);
1792 // Create an LB channel if we don't have one yet or the balancer name has
1793 // changed from the last received one.
1794 bool create_lb_channel = lb_chand_ == nullptr;
1795 if (lb_chand_ != nullptr) {
1796 UniquePtr<char> last_balancer_name(
1797 grpc_channel_get_target(LatestLbChannel()->channel()));
1799 strcmp(last_balancer_name.get(), balancer_name_.get()) != 0;
1801 if (create_lb_channel) {
1802 OrphanablePtr<LbChannelState> lb_chand = MakeOrphanable<LbChannelState>(
1803 Ref(DEBUG_LOCATION, "XdsLb+LbChannelState"), balancer_name_.get(),
1805 if (lb_chand_ == nullptr || !lb_chand_->HasActiveEdsCall()) {
1806 GPR_ASSERT(pending_lb_chand_ == nullptr);
1807 // If we do not have a working LB channel yet, use the newly created one.
1808 lb_chand_ = std::move(lb_chand);
1810 // Otherwise, wait until the new LB channel to be ready to swap it in.
1811 pending_lb_chand_ = std::move(lb_chand);
1814 grpc_channel_args_destroy(lb_channel_args);
1817 void XdsLb::ParseLbConfig(const ParsedXdsConfig* xds_config) {
1818 if (xds_config == nullptr || xds_config->balancer_name() == nullptr) return;
1819 // TODO(yashykt) : does this need to be a gpr_strdup
1820 // TODO(juanlishen): Read balancer name from bootstrap file.
1821 balancer_name_ = UniquePtr<char>(gpr_strdup(xds_config->balancer_name()));
1822 child_policy_config_ = xds_config->child_policy();
1823 fallback_policy_config_ = xds_config->fallback_policy();
1826 void XdsLb::UpdateLocked(UpdateArgs args) {
1827 const bool is_initial_update = lb_chand_ == nullptr;
1828 ParseLbConfig(static_cast<const ParsedXdsConfig*>(args.config.get()));
1829 if (balancer_name_ == nullptr) {
1830 gpr_log(GPR_ERROR, "[xdslb %p] LB config parsing fails.", this);
1833 ProcessAddressesAndChannelArgsLocked(std::move(args.addresses), *args.args);
1834 locality_map_.UpdateLocked(locality_list_, child_policy_config_.get(), args_,
1835 this, is_initial_update);
1836 // Update the existing fallback policy. The fallback policy config and/or the
1837 // fallback addresses may be new.
1838 if (fallback_policy_ != nullptr) UpdateFallbackPolicyLocked();
1839 // If this is the initial update, start the fallback-at-startup checks.
1840 if (is_initial_update) {
1841 grpc_millis deadline = ExecCtx::Get()->Now() + lb_fallback_timeout_ms_;
1842 Ref(DEBUG_LOCATION, "on_fallback_timer").release(); // Held by closure
1843 GRPC_CLOSURE_INIT(&lb_on_fallback_, &XdsLb::OnFallbackTimerLocked, this,
1844 grpc_combiner_scheduler(combiner()));
1845 fallback_at_startup_checks_pending_ = true;
1846 grpc_timer_init(&lb_fallback_timer_, deadline, &lb_on_fallback_);
1847 // Start watching the channel's connectivity state. If the channel
1848 // goes into state TRANSIENT_FAILURE, we go into fallback mode even if
1849 // the fallback timeout has not elapsed.
1850 lb_chand_->StartConnectivityWatchLocked();
1855 // fallback-related methods
1858 void XdsLb::MaybeCancelFallbackAtStartupChecks() {
1859 if (!fallback_at_startup_checks_pending_) return;
1861 "[xdslb %p] Cancelling fallback timer and LB channel connectivity "
1864 grpc_timer_cancel(&lb_fallback_timer_);
1865 lb_chand_->CancelConnectivityWatchLocked();
1866 fallback_at_startup_checks_pending_ = false;
1869 void XdsLb::OnFallbackTimerLocked(void* arg, grpc_error* error) {
1870 XdsLb* xdslb_policy = static_cast<XdsLb*>(arg);
1871 // If some fallback-at-startup check is done after the timer fires but before
1872 // this callback actually runs, don't fall back.
1873 if (xdslb_policy->fallback_at_startup_checks_pending_ &&
1874 !xdslb_policy->shutting_down_ && error == GRPC_ERROR_NONE) {
1875 if (GRPC_TRACE_FLAG_ENABLED(grpc_lb_xds_trace)) {
1877 "[xdslb %p] Child policy not ready after fallback timeout; "
1878 "entering fallback mode",
1881 xdslb_policy->fallback_at_startup_checks_pending_ = false;
1882 xdslb_policy->UpdateFallbackPolicyLocked();
1883 xdslb_policy->lb_chand_->CancelConnectivityWatchLocked();
1885 xdslb_policy->Unref(DEBUG_LOCATION, "on_fallback_timer");
1888 void XdsLb::UpdateFallbackPolicyLocked() {
1889 if (shutting_down_) return;
1890 // Construct update args.
1891 UpdateArgs update_args;
1892 update_args.addresses = fallback_backend_addresses_;
1893 update_args.config = fallback_policy_config_ == nullptr
1895 : fallback_policy_config_->Ref();
1896 update_args.args = grpc_channel_args_copy(args_);
1897 // If the child policy name changes, we need to create a new child
1898 // policy. When this happens, we leave child_policy_ as-is and store
1899 // the new child policy in pending_child_policy_. Once the new child
1900 // policy transitions into state READY, we swap it into child_policy_,
1901 // replacing the original child policy. So pending_child_policy_ is
1902 // non-null only between when we apply an update that changes the child
1903 // policy name and when the new child reports state READY.
1905 // Updates can arrive at any point during this transition. We always
1906 // apply updates relative to the most recently created child policy,
1907 // even if the most recent one is still in pending_child_policy_. This
1908 // is true both when applying the updates to an existing child policy
1909 // and when determining whether we need to create a new policy.
1911 // As a result of this, there are several cases to consider here:
1913 // 1. We have no existing child policy (i.e., we have started up but
1914 // have not yet received a serverlist from the balancer or gone
1915 // into fallback mode; in this case, both child_policy_ and
1916 // pending_child_policy_ are null). In this case, we create a
1917 // new child policy and store it in child_policy_.
1919 // 2. We have an existing child policy and have no pending child policy
1920 // from a previous update (i.e., either there has not been a
1921 // previous update that changed the policy name, or we have already
1922 // finished swapping in the new policy; in this case, child_policy_
1923 // is non-null but pending_child_policy_ is null). In this case:
1924 // a. If child_policy_->name() equals child_policy_name, then we
1925 // update the existing child policy.
1926 // b. If child_policy_->name() does not equal child_policy_name,
1927 // we create a new policy. The policy will be stored in
1928 // pending_child_policy_ and will later be swapped into
1929 // child_policy_ by the helper when the new child transitions
1930 // into state READY.
1932 // 3. We have an existing child policy and have a pending child policy
1933 // from a previous update (i.e., a previous update set
1934 // pending_child_policy_ as per case 2b above and that policy has
1935 // not yet transitioned into state READY and been swapped into
1936 // child_policy_; in this case, both child_policy_ and
1937 // pending_child_policy_ are non-null). In this case:
1938 // a. If pending_child_policy_->name() equals child_policy_name,
1939 // then we update the existing pending child policy.
1940 // b. If pending_child_policy->name() does not equal
1941 // child_policy_name, then we create a new policy. The new
1942 // policy is stored in pending_child_policy_ (replacing the one
1943 // that was there before, which will be immediately shut down)
1944 // and will later be swapped into child_policy_ by the helper
1945 // when the new child transitions into state READY.
1946 const char* fallback_policy_name = fallback_policy_config_ == nullptr
1948 : fallback_policy_config_->name();
1949 const bool create_policy =
1951 fallback_policy_ == nullptr ||
1953 (pending_fallback_policy_ == nullptr &&
1954 strcmp(fallback_policy_->name(), fallback_policy_name) != 0) ||
1956 (pending_fallback_policy_ != nullptr &&
1957 strcmp(pending_fallback_policy_->name(), fallback_policy_name) != 0);
1958 LoadBalancingPolicy* policy_to_update = nullptr;
1959 if (create_policy) {
1960 // Cases 1, 2b, and 3b: create a new child policy.
1961 // If child_policy_ is null, we set it (case 1), else we set
1962 // pending_child_policy_ (cases 2b and 3b).
1963 if (GRPC_TRACE_FLAG_ENABLED(grpc_lb_xds_trace)) {
1964 gpr_log(GPR_INFO, "[xdslb %p] Creating new %sfallback policy %s", this,
1965 fallback_policy_ == nullptr ? "" : "pending ",
1966 fallback_policy_name);
1968 auto& lb_policy = fallback_policy_ == nullptr ? fallback_policy_
1969 : pending_fallback_policy_;
1971 CreateFallbackPolicyLocked(fallback_policy_name, update_args.args);
1972 policy_to_update = lb_policy.get();
1974 // Cases 2a and 3a: update an existing policy.
1975 // If we have a pending child policy, send the update to the pending
1976 // policy (case 3a), else send it to the current policy (case 2a).
1977 policy_to_update = pending_fallback_policy_ != nullptr
1978 ? pending_fallback_policy_.get()
1979 : fallback_policy_.get();
1981 GPR_ASSERT(policy_to_update != nullptr);
1982 // Update the policy.
1983 if (GRPC_TRACE_FLAG_ENABLED(grpc_lb_xds_trace)) {
1985 GPR_INFO, "[xdslb %p] Updating %sfallback policy %p", this,
1986 policy_to_update == pending_fallback_policy_.get() ? "pending " : "",
1989 policy_to_update->UpdateLocked(std::move(update_args));
1992 OrphanablePtr<LoadBalancingPolicy> XdsLb::CreateFallbackPolicyLocked(
1993 const char* name, const grpc_channel_args* args) {
1994 FallbackHelper* helper =
1995 New<FallbackHelper>(Ref(DEBUG_LOCATION, "FallbackHelper"));
1996 LoadBalancingPolicy::Args lb_policy_args;
1997 lb_policy_args.combiner = combiner();
1998 lb_policy_args.args = args;
1999 lb_policy_args.channel_control_helper =
2000 UniquePtr<ChannelControlHelper>(helper);
2001 OrphanablePtr<LoadBalancingPolicy> lb_policy =
2002 LoadBalancingPolicyRegistry::CreateLoadBalancingPolicy(
2003 name, std::move(lb_policy_args));
2004 if (GPR_UNLIKELY(lb_policy == nullptr)) {
2005 gpr_log(GPR_ERROR, "[xdslb %p] Failure creating fallback policy %s", this,
2009 helper->set_child(lb_policy.get());
2010 if (GRPC_TRACE_FLAG_ENABLED(grpc_lb_xds_trace)) {
2011 gpr_log(GPR_INFO, "[xdslb %p] Created new fallback policy %s (%p)", this,
2012 name, lb_policy.get());
2014 // Add the xDS's interested_parties pollset_set to that of the newly created
2015 // child policy. This will make the child policy progress upon activity on xDS
2016 // LB, which in turn is tied to the application's call.
2017 grpc_pollset_set_add_pollset_set(lb_policy->interested_parties(),
2018 interested_parties());
2022 void XdsLb::MaybeExitFallbackMode() {
2023 if (fallback_policy_ == nullptr) return;
2024 gpr_log(GPR_INFO, "[xdslb %p] Exiting fallback mode", this);
2025 fallback_policy_.reset();
2026 pending_fallback_policy_.reset();
2030 // XdsLb::LocalityMap
2033 void XdsLb::LocalityMap::UpdateLocked(
2034 const XdsLocalityList& locality_list,
2035 LoadBalancingPolicy::Config* child_policy_config,
2036 const grpc_channel_args* args, XdsLb* parent, bool is_initial_update) {
2037 if (parent->shutting_down_) return;
2038 // Add or update the localities in locality_list.
2039 for (size_t i = 0; i < locality_list.size(); i++) {
2040 auto& locality_name = locality_list[i].locality_name;
2041 auto iter = map_.find(locality_name);
2042 // Add a new entry in the locality map if a new locality is received in the
2044 if (iter == map_.end()) {
2045 OrphanablePtr<LocalityEntry> new_entry = MakeOrphanable<LocalityEntry>(
2046 parent->Ref(DEBUG_LOCATION, "LocalityEntry"), locality_name);
2047 iter = map_.emplace(locality_name, std::move(new_entry)).first;
2049 // Keep a copy of serverlist in locality_list_ so that we can compare it
2050 // with the future ones.
2051 iter->second->UpdateLocked(locality_list[i].lb_weight,
2052 locality_list[i].serverlist, child_policy_config,
2055 // Remove (later) the localities not in locality_list.
2056 for (auto& p : map_) {
2057 const XdsLocalityName* locality_name = p.first.get();
2058 LocalityEntry* locality_entry = p.second.get();
2059 bool in_locality_list = false;
2060 for (size_t i = 0; i < locality_list.size(); ++i) {
2061 if (*locality_list[i].locality_name == *locality_name) {
2062 in_locality_list = true;
2066 if (!in_locality_list) locality_entry->DeactivateLocked();
2068 // Generate a new xds picker immediately.
2069 if (!is_initial_update) UpdateXdsPickerLocked();
2072 void XdsLb::LocalityMap::UpdateXdsPickerLocked() {
2073 // If we are in fallback mode, don't generate an xds picker from localities.
2074 if (xds_policy_->fallback_policy_ != nullptr) return;
2075 // Construct a new xds picker which maintains a map of all locality pickers
2076 // that are ready. Each locality is represented by a portion of the range
2077 // proportional to its weight, such that the total range is the sum of the
2078 // weights of all localities.
2080 size_t num_connecting = 0;
2081 size_t num_idle = 0;
2082 size_t num_transient_failures = 0;
2083 Picker::PickerList pickers;
2084 for (auto& p : map_) {
2085 const LocalityEntry* entry = p.second.get();
2086 if (entry->locality_weight() == 0) continue;
2087 switch (entry->connectivity_state()) {
2088 case GRPC_CHANNEL_READY: {
2089 end += entry->locality_weight();
2090 pickers.push_back(MakePair(end, entry->picker_wrapper()));
2093 case GRPC_CHANNEL_CONNECTING: {
2097 case GRPC_CHANNEL_IDLE: {
2101 case GRPC_CHANNEL_TRANSIENT_FAILURE: {
2102 num_transient_failures++;
2106 GPR_UNREACHABLE_CODE(return );
2109 // Pass on the constructed xds picker if it has any ready pickers in their map
2110 // otherwise pass a QueuePicker if any of the locality pickers are in a
2111 // connecting or idle state, finally return a transient failure picker if all
2112 // locality pickers are in transient failure.
2113 if (!pickers.empty()) {
2114 xds_policy_->channel_control_helper()->UpdateState(
2116 UniquePtr<LoadBalancingPolicy::SubchannelPicker>(
2117 New<Picker>(xds_policy_->Ref(DEBUG_LOCATION, "XdsLb+Picker"),
2118 std::move(pickers))));
2119 } else if (num_connecting > 0) {
2120 xds_policy_->channel_control_helper()->UpdateState(
2121 GRPC_CHANNEL_CONNECTING,
2122 UniquePtr<SubchannelPicker>(
2123 New<QueuePicker>(xds_policy_->Ref(DEBUG_LOCATION, "QueuePicker"))));
2124 } else if (num_idle > 0) {
2125 xds_policy_->channel_control_helper()->UpdateState(
2127 UniquePtr<SubchannelPicker>(
2128 New<QueuePicker>(xds_policy_->Ref(DEBUG_LOCATION, "QueuePicker"))));
2131 grpc_error_set_int(GRPC_ERROR_CREATE_FROM_STATIC_STRING(
2132 "connections to all active localities failing"),
2133 GRPC_ERROR_INT_GRPC_STATUS, GRPC_STATUS_UNAVAILABLE);
2134 xds_policy_->channel_control_helper()->UpdateState(
2135 GRPC_CHANNEL_TRANSIENT_FAILURE,
2136 UniquePtr<SubchannelPicker>(New<TransientFailurePicker>(error)));
2140 void XdsLb::LocalityMap::ShutdownLocked() { map_.clear(); }
2142 void XdsLb::LocalityMap::ResetBackoffLocked() {
2143 for (auto& p : map_) {
2144 p.second->ResetBackoffLocked();
2149 // XdsLb::LocalityMap::LocalityEntry
2152 XdsLb::LocalityMap::LocalityEntry::LocalityEntry(
2153 RefCountedPtr<XdsLb> parent, RefCountedPtr<XdsLocalityName> name)
2154 : parent_(std::move(parent)), name_(std::move(name)) {
2155 if (GRPC_TRACE_FLAG_ENABLED(grpc_lb_xds_trace)) {
2156 gpr_log(GPR_INFO, "[xdslb %p] created LocalityEntry %p for %s",
2157 parent_.get(), this, name_->AsHumanReadableString());
2159 GRPC_CLOSURE_INIT(&on_delayed_removal_timer_, OnDelayedRemovalTimerLocked,
2160 this, grpc_combiner_scheduler(parent_->combiner()));
2163 XdsLb::LocalityMap::LocalityEntry::~LocalityEntry() {
2164 if (GRPC_TRACE_FLAG_ENABLED(grpc_lb_xds_trace)) {
2166 "[xdslb %p] LocalityEntry %p %s: destroying locality entry",
2167 parent_.get(), this, name_->AsHumanReadableString());
2169 parent_.reset(DEBUG_LOCATION, "LocalityEntry");
2173 XdsLb::LocalityMap::LocalityEntry::CreateChildPolicyArgsLocked(
2174 const grpc_channel_args* args_in) {
2175 const grpc_arg args_to_add[] = {
2176 // A channel arg indicating if the target is a backend inferred from a
2177 // grpclb load balancer.
2178 grpc_channel_arg_integer_create(
2179 const_cast<char*>(GRPC_ARG_ADDRESS_IS_BACKEND_FROM_XDS_LOAD_BALANCER),
2181 // Inhibit client-side health checking, since the balancer does
2183 grpc_channel_arg_integer_create(
2184 const_cast<char*>(GRPC_ARG_INHIBIT_HEALTH_CHECKING), 1),
2186 return grpc_channel_args_copy_and_add(args_in, args_to_add,
2187 GPR_ARRAY_SIZE(args_to_add));
2190 OrphanablePtr<LoadBalancingPolicy>
2191 XdsLb::LocalityMap::LocalityEntry::CreateChildPolicyLocked(
2192 const char* name, const grpc_channel_args* args) {
2193 Helper* helper = New<Helper>(this->Ref(DEBUG_LOCATION, "Helper"));
2194 LoadBalancingPolicy::Args lb_policy_args;
2195 lb_policy_args.combiner = parent_->combiner();
2196 lb_policy_args.args = args;
2197 lb_policy_args.channel_control_helper =
2198 UniquePtr<ChannelControlHelper>(helper);
2199 OrphanablePtr<LoadBalancingPolicy> lb_policy =
2200 LoadBalancingPolicyRegistry::CreateLoadBalancingPolicy(
2201 name, std::move(lb_policy_args));
2202 if (GPR_UNLIKELY(lb_policy == nullptr)) {
2204 "[xdslb %p] LocalityEntry %p %s: failure creating child policy %s",
2205 parent_.get(), this, name_->AsHumanReadableString(), name);
2208 helper->set_child(lb_policy.get());
2209 if (GRPC_TRACE_FLAG_ENABLED(grpc_lb_xds_trace)) {
2211 "[xdslb %p] LocalityEntry %p %s: Created new child policy %s (%p)",
2212 parent_.get(), this, name_->AsHumanReadableString(), name,
2215 // Add the xDS's interested_parties pollset_set to that of the newly created
2216 // child policy. This will make the child policy progress upon activity on xDS
2217 // LB, which in turn is tied to the application's call.
2218 grpc_pollset_set_add_pollset_set(lb_policy->interested_parties(),
2219 parent_->interested_parties());
2223 void XdsLb::LocalityMap::LocalityEntry::UpdateLocked(
2224 uint32_t locality_weight, ServerAddressList serverlist,
2225 LoadBalancingPolicy::Config* child_policy_config,
2226 const grpc_channel_args* args_in) {
2227 if (parent_->shutting_down_) return;
2228 // Update locality weight.
2229 locality_weight_ = locality_weight;
2230 if (delayed_removal_timer_callback_pending_) {
2231 grpc_timer_cancel(&delayed_removal_timer_);
2233 // Construct update args.
2234 UpdateArgs update_args;
2235 update_args.addresses = std::move(serverlist);
2236 update_args.config =
2237 child_policy_config == nullptr ? nullptr : child_policy_config->Ref();
2238 update_args.args = CreateChildPolicyArgsLocked(args_in);
2239 // If the child policy name changes, we need to create a new child
2240 // policy. When this happens, we leave child_policy_ as-is and store
2241 // the new child policy in pending_child_policy_. Once the new child
2242 // policy transitions into state READY, we swap it into child_policy_,
2243 // replacing the original child policy. So pending_child_policy_ is
2244 // non-null only between when we apply an update that changes the child
2245 // policy name and when the new child reports state READY.
2247 // Updates can arrive at any point during this transition. We always
2248 // apply updates relative to the most recently created child policy,
2249 // even if the most recent one is still in pending_child_policy_. This
2250 // is true both when applying the updates to an existing child policy
2251 // and when determining whether we need to create a new policy.
2253 // As a result of this, there are several cases to consider here:
2255 // 1. We have no existing child policy (i.e., we have started up but
2256 // have not yet received a serverlist from the balancer or gone
2257 // into fallback mode; in this case, both child_policy_ and
2258 // pending_child_policy_ are null). In this case, we create a
2259 // new child policy and store it in child_policy_.
2261 // 2. We have an existing child policy and have no pending child policy
2262 // from a previous update (i.e., either there has not been a
2263 // previous update that changed the policy name, or we have already
2264 // finished swapping in the new policy; in this case, child_policy_
2265 // is non-null but pending_child_policy_ is null). In this case:
2266 // a. If child_policy_->name() equals child_policy_name, then we
2267 // update the existing child policy.
2268 // b. If child_policy_->name() does not equal child_policy_name,
2269 // we create a new policy. The policy will be stored in
2270 // pending_child_policy_ and will later be swapped into
2271 // child_policy_ by the helper when the new child transitions
2272 // into state READY.
2274 // 3. We have an existing child policy and have a pending child policy
2275 // from a previous update (i.e., a previous update set
2276 // pending_child_policy_ as per case 2b above and that policy has
2277 // not yet transitioned into state READY and been swapped into
2278 // child_policy_; in this case, both child_policy_ and
2279 // pending_child_policy_ are non-null). In this case:
2280 // a. If pending_child_policy_->name() equals child_policy_name,
2281 // then we update the existing pending child policy.
2282 // b. If pending_child_policy->name() does not equal
2283 // child_policy_name, then we create a new policy. The new
2284 // policy is stored in pending_child_policy_ (replacing the one
2285 // that was there before, which will be immediately shut down)
2286 // and will later be swapped into child_policy_ by the helper
2287 // when the new child transitions into state READY.
2288 // TODO(juanlishen): If the child policy is not configured via service config,
2289 // use whatever algorithm is specified by the balancer.
2290 const char* child_policy_name = child_policy_config == nullptr
2292 : child_policy_config->name();
2293 const bool create_policy =
2295 child_policy_ == nullptr ||
2297 (pending_child_policy_ == nullptr &&
2298 strcmp(child_policy_->name(), child_policy_name) != 0) ||
2300 (pending_child_policy_ != nullptr &&
2301 strcmp(pending_child_policy_->name(), child_policy_name) != 0);
2302 LoadBalancingPolicy* policy_to_update = nullptr;
2303 if (create_policy) {
2304 // Cases 1, 2b, and 3b: create a new child policy.
2305 // If child_policy_ is null, we set it (case 1), else we set
2306 // pending_child_policy_ (cases 2b and 3b).
2307 if (GRPC_TRACE_FLAG_ENABLED(grpc_lb_xds_trace)) {
2309 "[xdslb %p] LocalityEntry %p %s: Creating new %schild policy %s",
2310 parent_.get(), this, name_->AsHumanReadableString(),
2311 child_policy_ == nullptr ? "" : "pending ", child_policy_name);
2314 child_policy_ == nullptr ? child_policy_ : pending_child_policy_;
2315 lb_policy = CreateChildPolicyLocked(child_policy_name, update_args.args);
2316 policy_to_update = lb_policy.get();
2318 // Cases 2a and 3a: update an existing policy.
2319 // If we have a pending child policy, send the update to the pending
2320 // policy (case 3a), else send it to the current policy (case 2a).
2321 policy_to_update = pending_child_policy_ != nullptr
2322 ? pending_child_policy_.get()
2323 : child_policy_.get();
2325 GPR_ASSERT(policy_to_update != nullptr);
2326 // Update the policy.
2327 if (GRPC_TRACE_FLAG_ENABLED(grpc_lb_xds_trace)) {
2329 "[xdslb %p] LocalityEntry %p %s: Updating %schild policy %p",
2330 parent_.get(), this, name_->AsHumanReadableString(),
2331 policy_to_update == pending_child_policy_.get() ? "pending " : "",
2334 policy_to_update->UpdateLocked(std::move(update_args));
2337 void XdsLb::LocalityMap::LocalityEntry::ShutdownLocked() {
2338 if (GRPC_TRACE_FLAG_ENABLED(grpc_lb_xds_trace)) {
2340 "[xdslb %p] LocalityEntry %p %s: shutting down locality entry",
2341 parent_.get(), this, name_->AsHumanReadableString());
2343 // Remove the child policy's interested_parties pollset_set from the
2345 grpc_pollset_set_del_pollset_set(child_policy_->interested_parties(),
2346 parent_->interested_parties());
2347 child_policy_.reset();
2348 if (pending_child_policy_ != nullptr) {
2349 grpc_pollset_set_del_pollset_set(
2350 pending_child_policy_->interested_parties(),
2351 parent_->interested_parties());
2352 pending_child_policy_.reset();
2354 // Drop our ref to the child's picker, in case it's holding a ref to
2356 picker_wrapper_.reset();
2357 if (delayed_removal_timer_callback_pending_) {
2358 grpc_timer_cancel(&delayed_removal_timer_);
2362 void XdsLb::LocalityMap::LocalityEntry::ResetBackoffLocked() {
2363 child_policy_->ResetBackoffLocked();
2364 if (pending_child_policy_ != nullptr) {
2365 pending_child_policy_->ResetBackoffLocked();
2369 void XdsLb::LocalityMap::LocalityEntry::Orphan() {
2374 void XdsLb::LocalityMap::LocalityEntry::DeactivateLocked() {
2375 // If locality retaining is disabled, delete the locality immediately.
2376 if (parent_->locality_retention_interval_ms_ == 0) {
2377 parent_->locality_map_.map_.erase(name_);
2380 // If already deactivated, don't do that again.
2381 if (locality_weight_ == 0) return;
2382 // Set the locality weight to 0 so that future xds picker won't contain this
2384 locality_weight_ = 0;
2385 // Start a timer to delete the locality.
2386 Ref(DEBUG_LOCATION, "LocalityEntry+timer").release();
2388 &delayed_removal_timer_,
2389 ExecCtx::Get()->Now() + parent_->locality_retention_interval_ms_,
2390 &on_delayed_removal_timer_);
2391 delayed_removal_timer_callback_pending_ = true;
2394 void XdsLb::LocalityMap::LocalityEntry::OnDelayedRemovalTimerLocked(
2395 void* arg, grpc_error* error) {
2396 LocalityEntry* self = static_cast<LocalityEntry*>(arg);
2397 self->delayed_removal_timer_callback_pending_ = false;
2398 if (error == GRPC_ERROR_NONE && self->locality_weight_ == 0) {
2399 self->parent_->locality_map_.map_.erase(self->name_);
2401 self->Unref(DEBUG_LOCATION, "LocalityEntry+timer");
2405 // XdsLb::LocalityEntry::Helper
2408 bool XdsLb::LocalityMap::LocalityEntry::Helper::CalledByPendingChild() const {
2409 GPR_ASSERT(child_ != nullptr);
2410 return child_ == entry_->pending_child_policy_.get();
2413 bool XdsLb::LocalityMap::LocalityEntry::Helper::CalledByCurrentChild() const {
2414 GPR_ASSERT(child_ != nullptr);
2415 return child_ == entry_->child_policy_.get();
2418 RefCountedPtr<SubchannelInterface>
2419 XdsLb::LocalityMap::LocalityEntry::Helper::CreateSubchannel(
2420 const grpc_channel_args& args) {
2421 if (entry_->parent_->shutting_down_ ||
2422 (!CalledByPendingChild() && !CalledByCurrentChild())) {
2425 return entry_->parent_->channel_control_helper()->CreateSubchannel(args);
2428 void XdsLb::LocalityMap::LocalityEntry::Helper::UpdateState(
2429 grpc_connectivity_state state, UniquePtr<SubchannelPicker> picker) {
2430 if (entry_->parent_->shutting_down_) return;
2431 // If this request is from the pending child policy, ignore it until
2432 // it reports READY, at which point we swap it into place.
2433 if (CalledByPendingChild()) {
2434 if (GRPC_TRACE_FLAG_ENABLED(grpc_lb_xds_trace)) {
2436 "[xdslb %p helper %p] pending child policy %p reports state=%s",
2437 entry_->parent_.get(), this, entry_->pending_child_policy_.get(),
2438 grpc_connectivity_state_name(state));
2440 if (state != GRPC_CHANNEL_READY) return;
2441 grpc_pollset_set_del_pollset_set(
2442 entry_->child_policy_->interested_parties(),
2443 entry_->parent_->interested_parties());
2444 entry_->child_policy_ = std::move(entry_->pending_child_policy_);
2445 } else if (!CalledByCurrentChild()) {
2446 // This request is from an outdated child, so ignore it.
2449 // At this point, child_ must be the current child policy.
2450 if (state == GRPC_CHANNEL_READY) {
2451 entry_->parent_->MaybeCancelFallbackAtStartupChecks();
2452 entry_->parent_->MaybeExitFallbackMode();
2454 GPR_ASSERT(entry_->parent_->lb_chand_ != nullptr);
2455 // Cache the picker and its state in the entry.
2456 entry_->picker_wrapper_ = MakeRefCounted<PickerWrapper>(
2458 entry_->parent_->client_stats_.FindLocalityStats(entry_->name_));
2459 entry_->connectivity_state_ = state;
2460 // Construct a new xds picker and pass it to the channel.
2461 entry_->parent_->locality_map_.UpdateXdsPickerLocked();
2464 void XdsLb::LocalityMap::LocalityEntry::Helper::RequestReresolution() {
2465 if (entry_->parent_->shutting_down_) return;
2466 // If there is a pending child policy, ignore re-resolution requests
2467 // from the current child policy (or any outdated child).
2468 if (entry_->pending_child_policy_ != nullptr && !CalledByPendingChild()) {
2471 if (GRPC_TRACE_FLAG_ENABLED(grpc_lb_xds_trace)) {
2473 "[xdslb %p] Re-resolution requested from the internal RR policy "
2475 entry_->parent_.get(), entry_->child_policy_.get());
2477 GPR_ASSERT(entry_->parent_->lb_chand_ != nullptr);
2478 // If we are talking to a balancer, we expect to get updated addresses
2479 // from the balancer, so we can ignore the re-resolution request from
2480 // the child policy. Otherwise, pass the re-resolution request up to the
2482 if (entry_->parent_->lb_chand_->eds_calld() == nullptr ||
2483 !entry_->parent_->lb_chand_->eds_calld()->seen_response()) {
2484 entry_->parent_->channel_control_helper()->RequestReresolution();
2488 void XdsLb::LocalityMap::LocalityEntry::Helper::AddTraceEvent(
2489 TraceSeverity severity, StringView message) {
2490 if (entry_->parent_->shutting_down_ ||
2491 (!CalledByPendingChild() && !CalledByCurrentChild())) {
2494 entry_->parent_->channel_control_helper()->AddTraceEvent(severity, message);
2501 class XdsFactory : public LoadBalancingPolicyFactory {
2503 OrphanablePtr<LoadBalancingPolicy> CreateLoadBalancingPolicy(
2504 LoadBalancingPolicy::Args args) const override {
2505 return OrphanablePtr<LoadBalancingPolicy>(New<XdsLb>(std::move(args)));
2508 const char* name() const override { return kXds; }
2510 RefCountedPtr<LoadBalancingPolicy::Config> ParseLoadBalancingConfig(
2511 const grpc_json* json, grpc_error** error) const override {
2512 GPR_DEBUG_ASSERT(error != nullptr && *error == GRPC_ERROR_NONE);
2513 if (json == nullptr) {
2514 // xds was mentioned as a policy in the deprecated loadBalancingPolicy
2515 // field or in the client API.
2516 *error = GRPC_ERROR_CREATE_FROM_STATIC_STRING(
2517 "field:loadBalancingPolicy error:Xds Parser has required field - "
2518 "balancerName. Please use loadBalancingConfig field of service "
2522 GPR_DEBUG_ASSERT(strcmp(json->key, name()) == 0);
2524 InlinedVector<grpc_error*, 3> error_list;
2525 const char* balancer_name = nullptr;
2526 RefCountedPtr<LoadBalancingPolicy::Config> child_policy;
2527 RefCountedPtr<LoadBalancingPolicy::Config> fallback_policy;
2528 for (const grpc_json* field = json->child; field != nullptr;
2529 field = field->next) {
2530 if (field->key == nullptr) continue;
2531 if (strcmp(field->key, "balancerName") == 0) {
2532 if (balancer_name != nullptr) {
2533 error_list.push_back(GRPC_ERROR_CREATE_FROM_STATIC_STRING(
2534 "field:balancerName error:Duplicate entry"));
2536 if (field->type != GRPC_JSON_STRING) {
2537 error_list.push_back(GRPC_ERROR_CREATE_FROM_STATIC_STRING(
2538 "field:balancerName error:type should be string"));
2541 balancer_name = field->value;
2542 } else if (strcmp(field->key, "childPolicy") == 0) {
2543 if (child_policy != nullptr) {
2544 error_list.push_back(GRPC_ERROR_CREATE_FROM_STATIC_STRING(
2545 "field:childPolicy error:Duplicate entry"));
2547 grpc_error* parse_error = GRPC_ERROR_NONE;
2548 child_policy = LoadBalancingPolicyRegistry::ParseLoadBalancingConfig(
2549 field, &parse_error);
2550 if (child_policy == nullptr) {
2551 GPR_DEBUG_ASSERT(parse_error != GRPC_ERROR_NONE);
2552 error_list.push_back(parse_error);
2554 } else if (strcmp(field->key, "fallbackPolicy") == 0) {
2555 if (fallback_policy != nullptr) {
2556 error_list.push_back(GRPC_ERROR_CREATE_FROM_STATIC_STRING(
2557 "field:fallbackPolicy error:Duplicate entry"));
2559 grpc_error* parse_error = GRPC_ERROR_NONE;
2560 fallback_policy = LoadBalancingPolicyRegistry::ParseLoadBalancingConfig(
2561 field, &parse_error);
2562 if (fallback_policy == nullptr) {
2563 GPR_DEBUG_ASSERT(parse_error != GRPC_ERROR_NONE);
2564 error_list.push_back(parse_error);
2568 if (error_list.empty()) {
2569 return RefCountedPtr<LoadBalancingPolicy::Config>(New<ParsedXdsConfig>(
2570 balancer_name, std::move(child_policy), std::move(fallback_policy)));
2572 *error = GRPC_ERROR_CREATE_FROM_VECTOR("Xds Parser", &error_list);
2580 } // namespace grpc_core
2583 // Plugin registration
2586 void grpc_lb_policy_xds_init() {
2587 grpc_core::LoadBalancingPolicyRegistry::Builder::
2588 RegisterLoadBalancingPolicyFactory(
2589 grpc_core::UniquePtr<grpc_core::LoadBalancingPolicyFactory>(
2590 grpc_core::New<grpc_core::XdsFactory>()));
2593 void grpc_lb_policy_xds_shutdown() {}