Built motion from commit 6a09e18b.|2.6.11
[motion2.git] / legacy-libs / grpc / deps / grpc / src / core / ext / filters / client_channel / lb_policy / xds / xds.cc
1 /*
2  *
3  * Copyright 2018 gRPC authors.
4  *
5  * Licensed under the Apache License, Version 2.0 (the "License");
6  * you may not use this file except in compliance with the License.
7  * You may obtain a copy of the License at
8  *
9  *     http://www.apache.org/licenses/LICENSE-2.0
10  *
11  * Unless required by applicable law or agreed to in writing, software
12  * distributed under the License is distributed on an "AS IS" BASIS,
13  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14  * See the License for the specific language governing permissions and
15  * limitations under the License.
16  *
17  */
18
19 /// Implementation of the gRPC LB policy.
20 ///
21 /// This policy takes as input a list of resolved addresses, which must
22 /// include at least one balancer address.
23 ///
24 /// An internal channel (\a lb_channel_) is created for the addresses
25 /// from that are balancers.  This channel behaves just like a regular
26 /// channel that uses pick_first to select from the list of balancer
27 /// addresses.
28 ///
29 /// When we get our initial update, we instantiate the internal *streaming*
30 /// call to the LB server (whichever address pick_first chose). The call
31 /// will be complete when either the balancer sends status or when we cancel
32 /// the call (e.g., because we are shutting down). In needed, we retry the
33 /// call. If we received at least one valid message from the server, a new
34 /// call attempt will be made immediately; otherwise, we apply back-off
35 /// delays between attempts.
36 ///
37 /// We maintain an internal child policy (round_robin) instance for distributing
38 /// requests across backends.  Whenever we receive a new serverlist from
39 /// the balancer, we update the child policy with the new list of
40 /// addresses.
41 ///
42 /// Once a child policy instance is in place (and getting updated as
43 /// described), calls for a pick, or a cancellation will be serviced right away
44 /// by forwarding them to the child policy instance. Any time there's no child
45 /// policy available (i.e., right after the creation of the xDS policy), pick
46 /// requests are added to a list of pending picks to be flushed and serviced
47 /// when the child policy instance becomes available.
48 ///
49 /// \see https://github.com/grpc/grpc/blob/master/doc/load-balancing.md for the
50 /// high level design and details.
51
52 // With the addition of a libuv endpoint, sockaddr.h now includes uv.h when
53 // using that endpoint. Because of various transitive includes in uv.h,
54 // including windows.h on Windows, uv.h must be included before other system
55 // headers. Therefore, sockaddr.h must always be included first.
56 #include <grpc/support/port_platform.h>
57
58 #include "src/core/lib/iomgr/sockaddr.h"
59 #include "src/core/lib/iomgr/socket_utils.h"
60
61 #include <inttypes.h>
62 #include <limits.h>
63 #include <string.h>
64
65 #include <grpc/byte_buffer_reader.h>
66 #include <grpc/grpc.h>
67 #include <grpc/support/alloc.h>
68 #include <grpc/support/string_util.h>
69 #include <grpc/support/time.h>
70
71 #include "include/grpc/support/alloc.h"
72 #include "src/core/ext/filters/client_channel/client_channel.h"
73 #include "src/core/ext/filters/client_channel/lb_policy.h"
74 #include "src/core/ext/filters/client_channel/lb_policy/xds/xds.h"
75 #include "src/core/ext/filters/client_channel/lb_policy/xds/xds_channel.h"
76 #include "src/core/ext/filters/client_channel/lb_policy/xds/xds_client_stats.h"
77 #include "src/core/ext/filters/client_channel/lb_policy/xds/xds_load_balancer_api.h"
78 #include "src/core/ext/filters/client_channel/lb_policy_factory.h"
79 #include "src/core/ext/filters/client_channel/lb_policy_registry.h"
80 #include "src/core/ext/filters/client_channel/parse_address.h"
81 #include "src/core/ext/filters/client_channel/resolver/fake/fake_resolver.h"
82 #include "src/core/ext/filters/client_channel/server_address.h"
83 #include "src/core/ext/filters/client_channel/service_config.h"
84 #include "src/core/lib/backoff/backoff.h"
85 #include "src/core/lib/channel/channel_args.h"
86 #include "src/core/lib/channel/channel_stack.h"
87 #include "src/core/lib/gpr/string.h"
88 #include "src/core/lib/gprpp/manual_constructor.h"
89 #include "src/core/lib/gprpp/map.h"
90 #include "src/core/lib/gprpp/memory.h"
91 #include "src/core/lib/gprpp/orphanable.h"
92 #include "src/core/lib/gprpp/ref_counted_ptr.h"
93 #include "src/core/lib/gprpp/sync.h"
94 #include "src/core/lib/iomgr/combiner.h"
95 #include "src/core/lib/iomgr/sockaddr.h"
96 #include "src/core/lib/iomgr/sockaddr_utils.h"
97 #include "src/core/lib/iomgr/timer.h"
98 #include "src/core/lib/slice/slice_hash_table.h"
99 #include "src/core/lib/slice/slice_internal.h"
100 #include "src/core/lib/slice/slice_string_helpers.h"
101 #include "src/core/lib/surface/call.h"
102 #include "src/core/lib/surface/channel.h"
103 #include "src/core/lib/surface/channel_init.h"
104 #include "src/core/lib/transport/static_metadata.h"
105
106 #define GRPC_XDS_INITIAL_CONNECT_BACKOFF_SECONDS 1
107 #define GRPC_XDS_RECONNECT_BACKOFF_MULTIPLIER 1.6
108 #define GRPC_XDS_RECONNECT_MAX_BACKOFF_SECONDS 120
109 #define GRPC_XDS_RECONNECT_JITTER 0.2
110 #define GRPC_XDS_DEFAULT_FALLBACK_TIMEOUT_MS 10000
111 #define GRPC_XDS_MIN_CLIENT_LOAD_REPORTING_INTERVAL_MS 1000
112 #define GRPC_XDS_DEFAULT_LOCALITY_RETENTION_INTERVAL_MS (15 * 60 * 1000)
113
114 namespace grpc_core {
115
116 TraceFlag grpc_lb_xds_trace(false, "xds");
117
118 namespace {
119
120 constexpr char kXds[] = "xds_experimental";
121
122 class ParsedXdsConfig : public LoadBalancingPolicy::Config {
123  public:
124   ParsedXdsConfig(const char* balancer_name,
125                   RefCountedPtr<LoadBalancingPolicy::Config> child_policy,
126                   RefCountedPtr<LoadBalancingPolicy::Config> fallback_policy)
127       : balancer_name_(balancer_name),
128         child_policy_(std::move(child_policy)),
129         fallback_policy_(std::move(fallback_policy)) {}
130
131   const char* name() const override { return kXds; }
132
133   const char* balancer_name() const { return balancer_name_; };
134
135   RefCountedPtr<LoadBalancingPolicy::Config> child_policy() const {
136     return child_policy_;
137   }
138
139   RefCountedPtr<LoadBalancingPolicy::Config> fallback_policy() const {
140     return fallback_policy_;
141   }
142
143  private:
144   const char* balancer_name_ = nullptr;
145   RefCountedPtr<LoadBalancingPolicy::Config> child_policy_;
146   RefCountedPtr<LoadBalancingPolicy::Config> fallback_policy_;
147 };
148
149 class XdsLb : public LoadBalancingPolicy {
150  public:
151   explicit XdsLb(Args args);
152
153   const char* name() const override { return kXds; }
154
155   void UpdateLocked(UpdateArgs args) override;
156   void ResetBackoffLocked() override;
157
158  private:
159   // Contains a channel to the LB server and all the data related to the
160   // channel. Holds a ref to the xds policy.
161   class LbChannelState : public InternallyRefCounted<LbChannelState> {
162    public:
163     // An LB call wrapper that can restart a call upon failure. Holds a ref to
164     // the LB channel. The template parameter is the kind of wrapped LB call.
165     template <typename T>
166     class RetryableLbCall : public InternallyRefCounted<RetryableLbCall<T>> {
167      public:
168       explicit RetryableLbCall(RefCountedPtr<LbChannelState> lb_chand);
169
170       void Orphan() override;
171
172       void OnCallFinishedLocked();
173
174       T* lb_calld() const { return lb_calld_.get(); }
175       LbChannelState* lb_chand() const { return lb_chand_.get(); }
176
177      private:
178       void StartNewCallLocked();
179       void StartRetryTimerLocked();
180       static void OnRetryTimerLocked(void* arg, grpc_error* error);
181
182       // The wrapped LB call that talks to the LB server. It's instantiated
183       // every time we start a new call. It's null during call retry backoff.
184       OrphanablePtr<T> lb_calld_;
185       // The owing LB channel.
186       RefCountedPtr<LbChannelState> lb_chand_;
187
188       // Retry state.
189       BackOff backoff_;
190       grpc_timer retry_timer_;
191       grpc_closure on_retry_timer_;
192       bool retry_timer_callback_pending_ = false;
193
194       bool shutting_down_ = false;
195     };
196
197     // Contains an EDS call to the LB server.
198     class EdsCallState : public InternallyRefCounted<EdsCallState> {
199      public:
200       // The ctor and dtor should not be used directly.
201       explicit EdsCallState(
202           RefCountedPtr<RetryableLbCall<EdsCallState>> parent);
203       ~EdsCallState() override;
204
205       void Orphan() override;
206
207       RetryableLbCall<EdsCallState>* parent() const { return parent_.get(); }
208       LbChannelState* lb_chand() const { return parent_->lb_chand(); }
209       XdsLb* xdslb_policy() const { return lb_chand()->xdslb_policy(); }
210       bool seen_response() const { return seen_response_; }
211
212      private:
213       static void OnResponseReceivedLocked(void* arg, grpc_error* error);
214       static void OnStatusReceivedLocked(void* arg, grpc_error* error);
215
216       bool IsCurrentCallOnChannel() const;
217
218       // The owning RetryableLbCall<>.
219       RefCountedPtr<RetryableLbCall<EdsCallState>> parent_;
220       bool seen_response_ = false;
221
222       // Always non-NULL.
223       grpc_call* lb_call_;
224
225       // recv_initial_metadata
226       grpc_metadata_array initial_metadata_recv_;
227
228       // send_message
229       grpc_byte_buffer* send_message_payload_ = nullptr;
230
231       // recv_message
232       grpc_byte_buffer* recv_message_payload_ = nullptr;
233       grpc_closure on_response_received_;
234
235       // recv_trailing_metadata
236       grpc_metadata_array trailing_metadata_recv_;
237       grpc_status_code status_code_;
238       grpc_slice status_details_;
239       grpc_closure on_status_received_;
240     };
241
242     // Contains an LRS call to the LB server.
243     class LrsCallState : public InternallyRefCounted<LrsCallState> {
244      public:
245       // The ctor and dtor should not be used directly.
246       explicit LrsCallState(
247           RefCountedPtr<RetryableLbCall<LrsCallState>> parent);
248       ~LrsCallState() override;
249
250       void Orphan() override;
251
252       void MaybeStartReportingLocked();
253
254       RetryableLbCall<LrsCallState>* parent() { return parent_.get(); }
255       LbChannelState* lb_chand() const { return parent_->lb_chand(); }
256       XdsLb* xdslb_policy() const { return lb_chand()->xdslb_policy(); }
257       bool seen_response() const { return seen_response_; }
258
259      private:
260       // Reports client-side load stats according to a fixed interval.
261       class Reporter : public InternallyRefCounted<Reporter> {
262        public:
263         Reporter(RefCountedPtr<LrsCallState> parent,
264                  grpc_millis report_interval)
265             : parent_(std::move(parent)), report_interval_(report_interval) {
266           GRPC_CLOSURE_INIT(
267               &on_next_report_timer_, OnNextReportTimerLocked, this,
268               grpc_combiner_scheduler(xdslb_policy()->combiner()));
269           GRPC_CLOSURE_INIT(
270               &on_report_done_, OnReportDoneLocked, this,
271               grpc_combiner_scheduler(xdslb_policy()->combiner()));
272           ScheduleNextReportLocked();
273         }
274
275         void Orphan() override;
276
277        private:
278         void ScheduleNextReportLocked();
279         static void OnNextReportTimerLocked(void* arg, grpc_error* error);
280         void SendReportLocked();
281         static void OnReportDoneLocked(void* arg, grpc_error* error);
282
283         bool IsCurrentReporterOnCall() const {
284           return this == parent_->reporter_.get();
285         }
286         XdsLb* xdslb_policy() const { return parent_->xdslb_policy(); }
287
288         // The owning LRS call.
289         RefCountedPtr<LrsCallState> parent_;
290
291         // The load reporting state.
292         const grpc_millis report_interval_;
293         bool last_report_counters_were_zero_ = false;
294         bool next_report_timer_callback_pending_ = false;
295         grpc_timer next_report_timer_;
296         grpc_closure on_next_report_timer_;
297         grpc_closure on_report_done_;
298       };
299
300       static void OnInitialRequestSentLocked(void* arg, grpc_error* error);
301       static void OnResponseReceivedLocked(void* arg, grpc_error* error);
302       static void OnStatusReceivedLocked(void* arg, grpc_error* error);
303
304       bool IsCurrentCallOnChannel() const;
305
306       // The owning RetryableLbCall<>.
307       RefCountedPtr<RetryableLbCall<LrsCallState>> parent_;
308       bool seen_response_ = false;
309
310       // Always non-NULL.
311       grpc_call* lb_call_;
312
313       // recv_initial_metadata
314       grpc_metadata_array initial_metadata_recv_;
315
316       // send_message
317       grpc_byte_buffer* send_message_payload_ = nullptr;
318       grpc_closure on_initial_request_sent_;
319
320       // recv_message
321       grpc_byte_buffer* recv_message_payload_ = nullptr;
322       grpc_closure on_response_received_;
323
324       // recv_trailing_metadata
325       grpc_metadata_array trailing_metadata_recv_;
326       grpc_status_code status_code_;
327       grpc_slice status_details_;
328       grpc_closure on_status_received_;
329
330       // Load reporting state.
331       grpc_millis load_reporting_interval_ = 0;
332       OrphanablePtr<Reporter> reporter_;
333     };
334
335     LbChannelState(RefCountedPtr<XdsLb> xdslb_policy, const char* balancer_name,
336                    const grpc_channel_args& args);
337     ~LbChannelState();
338
339     void Orphan() override;
340
341     grpc_channel* channel() const { return channel_; }
342     XdsLb* xdslb_policy() const { return xdslb_policy_.get(); }
343     EdsCallState* eds_calld() const { return eds_calld_->lb_calld(); }
344     LrsCallState* lrs_calld() const { return lrs_calld_->lb_calld(); }
345
346     bool IsCurrentChannel() const {
347       return this == xdslb_policy_->lb_chand_.get();
348     }
349     bool IsPendingChannel() const {
350       return this == xdslb_policy_->pending_lb_chand_.get();
351     }
352     bool HasActiveEdsCall() const { return eds_calld_->lb_calld() != nullptr; }
353
354     void StartConnectivityWatchLocked();
355     void CancelConnectivityWatchLocked();
356     static void OnConnectivityChangedLocked(void* arg, grpc_error* error);
357
358    private:
359     // The owning LB policy.
360     RefCountedPtr<XdsLb> xdslb_policy_;
361
362     // The channel and its status.
363     grpc_channel* channel_;
364     bool shutting_down_ = false;
365     grpc_connectivity_state connectivity_ = GRPC_CHANNEL_IDLE;
366     grpc_closure on_connectivity_changed_;
367
368     // The retryable XDS calls to the LB server.
369     OrphanablePtr<RetryableLbCall<EdsCallState>> eds_calld_;
370     OrphanablePtr<RetryableLbCall<LrsCallState>> lrs_calld_;
371   };
372
373   // We need this wrapper for the following reasons:
374   // 1. To process per-locality load reporting.
375   // 2. Since pickers are UniquePtrs we use this RefCounted wrapper to control
376   // references to it by the xds picker and the locality entry.
377   class PickerWrapper : public RefCounted<PickerWrapper> {
378    public:
379     PickerWrapper(UniquePtr<SubchannelPicker> picker,
380                   RefCountedPtr<XdsClientStats::LocalityStats> locality_stats)
381         : picker_(std::move(picker)),
382           locality_stats_(std::move(locality_stats)) {
383       locality_stats_->RefByPicker();
384     }
385     ~PickerWrapper() { locality_stats_->UnrefByPicker(); }
386
387     PickResult Pick(PickArgs args);
388
389    private:
390     static void RecordCallCompletion(
391         void* arg, grpc_error* error,
392         LoadBalancingPolicy::MetadataInterface* recv_trailing_metadata,
393         LoadBalancingPolicy::CallState* call_state);
394
395     UniquePtr<SubchannelPicker> picker_;
396     RefCountedPtr<XdsClientStats::LocalityStats> locality_stats_;
397   };
398
399   // The picker will use a stateless weighting algorithm to pick the locality to
400   // use for each request.
401   class Picker : public SubchannelPicker {
402    public:
403     // Maintains a weighted list of pickers from each locality that is in ready
404     // state. The first element in the pair represents the end of a range
405     // proportional to the locality's weight. The start of the range is the
406     // previous value in the vector and is 0 for the first element.
407     using PickerList =
408         InlinedVector<Pair<uint32_t, RefCountedPtr<PickerWrapper>>, 1>;
409     Picker(RefCountedPtr<XdsLb> xds_policy, PickerList pickers)
410         : xds_policy_(std::move(xds_policy)),
411           pickers_(std::move(pickers)),
412           drop_config_(xds_policy_->drop_config_) {}
413
414     PickResult Pick(PickArgs args) override;
415
416    private:
417     // Calls the picker of the locality that the key falls within.
418     PickResult PickFromLocality(const uint32_t key, PickArgs args);
419
420     RefCountedPtr<XdsLb> xds_policy_;
421     PickerList pickers_;
422     RefCountedPtr<XdsDropConfig> drop_config_;
423   };
424
425   class FallbackHelper : public ChannelControlHelper {
426    public:
427     explicit FallbackHelper(RefCountedPtr<XdsLb> parent)
428         : parent_(std::move(parent)) {}
429
430     ~FallbackHelper() { parent_.reset(DEBUG_LOCATION, "FallbackHelper"); }
431
432     RefCountedPtr<SubchannelInterface> CreateSubchannel(
433         const grpc_channel_args& args) override;
434     void UpdateState(grpc_connectivity_state state,
435                      UniquePtr<SubchannelPicker> picker) override;
436     void RequestReresolution() override;
437     void AddTraceEvent(TraceSeverity severity, StringView message) override;
438
439     void set_child(LoadBalancingPolicy* child) { child_ = child; }
440
441    private:
442     bool CalledByPendingFallback() const;
443     bool CalledByCurrentFallback() const;
444
445     RefCountedPtr<XdsLb> parent_;
446     LoadBalancingPolicy* child_ = nullptr;
447   };
448
449   class LocalityMap {
450    public:
451     class LocalityEntry : public InternallyRefCounted<LocalityEntry> {
452      public:
453       LocalityEntry(RefCountedPtr<XdsLb> parent,
454                     RefCountedPtr<XdsLocalityName> name);
455       ~LocalityEntry();
456
457       void UpdateLocked(uint32_t locality_weight, ServerAddressList serverlist,
458                         LoadBalancingPolicy::Config* child_policy_config,
459                         const grpc_channel_args* args);
460       void ShutdownLocked();
461       void ResetBackoffLocked();
462       void DeactivateLocked();
463       void Orphan() override;
464
465       grpc_connectivity_state connectivity_state() const {
466         return connectivity_state_;
467       }
468       uint32_t locality_weight() const { return locality_weight_; }
469       RefCountedPtr<PickerWrapper> picker_wrapper() const {
470         return picker_wrapper_;
471       }
472
473      private:
474       class Helper : public ChannelControlHelper {
475        public:
476         explicit Helper(RefCountedPtr<LocalityEntry> entry)
477             : entry_(std::move(entry)) {}
478
479         ~Helper() { entry_.reset(DEBUG_LOCATION, "Helper"); }
480
481         RefCountedPtr<SubchannelInterface> CreateSubchannel(
482             const grpc_channel_args& args) override;
483         void UpdateState(grpc_connectivity_state state,
484                          UniquePtr<SubchannelPicker> picker) override;
485         void RequestReresolution() override;
486         void AddTraceEvent(TraceSeverity severity, StringView message) override;
487         void set_child(LoadBalancingPolicy* child) { child_ = child; }
488
489        private:
490         bool CalledByPendingChild() const;
491         bool CalledByCurrentChild() const;
492
493         RefCountedPtr<LocalityEntry> entry_;
494         LoadBalancingPolicy* child_ = nullptr;
495       };
496
497       // Methods for dealing with the child policy.
498       OrphanablePtr<LoadBalancingPolicy> CreateChildPolicyLocked(
499           const char* name, const grpc_channel_args* args);
500       grpc_channel_args* CreateChildPolicyArgsLocked(
501           const grpc_channel_args* args);
502
503       static void OnDelayedRemovalTimerLocked(void* arg, grpc_error* error);
504
505       RefCountedPtr<XdsLb> parent_;
506       RefCountedPtr<XdsLocalityName> name_;
507       OrphanablePtr<LoadBalancingPolicy> child_policy_;
508       OrphanablePtr<LoadBalancingPolicy> pending_child_policy_;
509       RefCountedPtr<PickerWrapper> picker_wrapper_;
510       grpc_connectivity_state connectivity_state_ = GRPC_CHANNEL_IDLE;
511       uint32_t locality_weight_;
512       grpc_closure on_delayed_removal_timer_;
513       grpc_timer delayed_removal_timer_;
514       bool delayed_removal_timer_callback_pending_ = false;
515     };
516
517     explicit LocalityMap(XdsLb* xds_policy) : xds_policy_(xds_policy) {}
518
519     void UpdateLocked(const XdsLocalityList& locality_list,
520                       LoadBalancingPolicy::Config* child_policy_config,
521                       const grpc_channel_args* args, XdsLb* parent,
522                       bool is_initial_update = false);
523     void UpdateXdsPickerLocked();
524     void ShutdownLocked();
525     void ResetBackoffLocked();
526
527    private:
528     XdsLb* xds_policy_;
529     Map<RefCountedPtr<XdsLocalityName>, OrphanablePtr<LocalityEntry>,
530         XdsLocalityName::Less>
531         map_;
532   };
533
534   ~XdsLb();
535
536   void ShutdownLocked() override;
537
538   // Helper function used in UpdateLocked().
539   void ProcessAddressesAndChannelArgsLocked(ServerAddressList addresses,
540                                             const grpc_channel_args& args);
541
542   // Parses the xds config given the JSON node of the first child of XdsConfig.
543   // If parsing succeeds, updates \a balancer_name, and updates \a
544   // child_policy_config_ and \a fallback_policy_config_ if they are also
545   // found. Does nothing upon failure.
546   void ParseLbConfig(const ParsedXdsConfig* xds_config);
547
548   LbChannelState* LatestLbChannel() const {
549     return pending_lb_chand_ != nullptr ? pending_lb_chand_.get()
550                                         : lb_chand_.get();
551   }
552
553   // Methods for dealing with fallback state.
554   void MaybeCancelFallbackAtStartupChecks();
555   static void OnFallbackTimerLocked(void* arg, grpc_error* error);
556   void UpdateFallbackPolicyLocked();
557   OrphanablePtr<LoadBalancingPolicy> CreateFallbackPolicyLocked(
558       const char* name, const grpc_channel_args* args);
559   void MaybeExitFallbackMode();
560
561   // Name of the backend server to connect to.
562   const char* server_name_ = nullptr;
563
564   // Name of the balancer to connect to.
565   UniquePtr<char> balancer_name_;
566
567   // Current channel args from the resolver.
568   grpc_channel_args* args_ = nullptr;
569
570   // Internal state.
571   bool shutting_down_ = false;
572
573   // The channel for communicating with the LB server.
574   OrphanablePtr<LbChannelState> lb_chand_;
575   OrphanablePtr<LbChannelState> pending_lb_chand_;
576
577   // Timeout in milliseconds for the LB call. 0 means no deadline.
578   const grpc_millis lb_call_timeout_ms_;
579
580   // Whether the checks for fallback at startup are ALL pending. There are
581   // several cases where this can be reset:
582   // 1. The fallback timer fires, we enter fallback mode.
583   // 2. Before the fallback timer fires, the LB channel becomes
584   // TRANSIENT_FAILURE or the LB call fails, we enter fallback mode.
585   // 3. Before the fallback timer fires, if any child policy in the locality map
586   // becomes READY, we cancel the fallback timer.
587   bool fallback_at_startup_checks_pending_ = false;
588   // Timeout in milliseconds for before using fallback backend addresses.
589   // 0 means not using fallback.
590   const grpc_millis lb_fallback_timeout_ms_;
591   // The backend addresses from the resolver.
592   ServerAddressList fallback_backend_addresses_;
593   // Fallback timer.
594   grpc_timer lb_fallback_timer_;
595   grpc_closure lb_on_fallback_;
596
597   // The policy to use for the fallback backends.
598   RefCountedPtr<LoadBalancingPolicy::Config> fallback_policy_config_;
599   // Non-null iff we are in fallback mode.
600   OrphanablePtr<LoadBalancingPolicy> fallback_policy_;
601   OrphanablePtr<LoadBalancingPolicy> pending_fallback_policy_;
602
603   // The policy to use for the backends.
604   RefCountedPtr<LoadBalancingPolicy::Config> child_policy_config_;
605   const grpc_millis locality_retention_interval_ms_;
606   // Map of policies to use in the backend
607   LocalityMap locality_map_;
608   // TODO(mhaidry) : Add support for multiple maps of localities
609   // with different priorities
610   XdsLocalityList locality_list_;
611   // TODO(mhaidry) : Add a pending locality map that may be swapped with the
612   // the current one when new localities in the pending map are ready
613   // to accept connections
614
615   // The config for dropping calls.
616   RefCountedPtr<XdsDropConfig> drop_config_;
617
618   // The stats for client-side load reporting.
619   XdsClientStats client_stats_;
620 };
621
622 //
623 // XdsLb::PickerWrapper::Pick
624 //
625
626 LoadBalancingPolicy::PickResult XdsLb::PickerWrapper::Pick(
627     LoadBalancingPolicy::PickArgs args) {
628   // Forward the pick to the picker returned from the child policy.
629   PickResult result = picker_->Pick(args);
630   if (result.type != PickResult::PICK_COMPLETE ||
631       result.subchannel == nullptr || locality_stats_ == nullptr) {
632     return result;
633   }
634   // Record a call started.
635   locality_stats_->AddCallStarted();
636   // Intercept the recv_trailing_metadata op to record call completion.
637   result.recv_trailing_metadata_ready = RecordCallCompletion;
638   result.recv_trailing_metadata_ready_user_data =
639       locality_stats_->Ref(DEBUG_LOCATION, "LocalityStats+call").release();
640   return result;
641 }
642
643 // Note that the following callback does not run in either the control plane
644 // combiner or the data plane combiner.
645 void XdsLb::PickerWrapper::RecordCallCompletion(
646     void* arg, grpc_error* error,
647     LoadBalancingPolicy::MetadataInterface* recv_trailing_metadata,
648     LoadBalancingPolicy::CallState* call_state) {
649   XdsClientStats::LocalityStats* locality_stats =
650       static_cast<XdsClientStats::LocalityStats*>(arg);
651   const bool call_failed = error != GRPC_ERROR_NONE;
652   locality_stats->AddCallFinished(call_failed);
653   locality_stats->Unref(DEBUG_LOCATION, "LocalityStats+call");
654 }
655
656 //
657 // XdsLb::Picker
658 //
659
660 XdsLb::PickResult XdsLb::Picker::Pick(PickArgs args) {
661   // Handle drop.
662   const UniquePtr<char>* drop_category;
663   if (drop_config_->ShouldDrop(&drop_category)) {
664     xds_policy_->client_stats_.AddCallDropped(*drop_category);
665     PickResult result;
666     result.type = PickResult::PICK_COMPLETE;
667     return result;
668   }
669   // Generate a random number in [0, total weight).
670   const uint32_t key = rand() % pickers_[pickers_.size() - 1].first;
671   // Forward pick to whichever locality maps to the range in which the
672   // random number falls in.
673   return PickFromLocality(key, args);
674 }
675
676 XdsLb::PickResult XdsLb::Picker::PickFromLocality(const uint32_t key,
677                                                   PickArgs args) {
678   size_t mid = 0;
679   size_t start_index = 0;
680   size_t end_index = pickers_.size() - 1;
681   size_t index = 0;
682   while (end_index > start_index) {
683     mid = (start_index + end_index) / 2;
684     if (pickers_[mid].first > key) {
685       end_index = mid;
686     } else if (pickers_[mid].first < key) {
687       start_index = mid + 1;
688     } else {
689       index = mid + 1;
690       break;
691     }
692   }
693   if (index == 0) index = start_index;
694   GPR_ASSERT(pickers_[index].first > key);
695   return pickers_[index].second->Pick(args);
696 }
697
698 //
699 // XdsLb::FallbackHelper
700 //
701
702 bool XdsLb::FallbackHelper::CalledByPendingFallback() const {
703   GPR_ASSERT(child_ != nullptr);
704   return child_ == parent_->pending_fallback_policy_.get();
705 }
706
707 bool XdsLb::FallbackHelper::CalledByCurrentFallback() const {
708   GPR_ASSERT(child_ != nullptr);
709   return child_ == parent_->fallback_policy_.get();
710 }
711
712 RefCountedPtr<SubchannelInterface> XdsLb::FallbackHelper::CreateSubchannel(
713     const grpc_channel_args& args) {
714   if (parent_->shutting_down_ ||
715       (!CalledByPendingFallback() && !CalledByCurrentFallback())) {
716     return nullptr;
717   }
718   return parent_->channel_control_helper()->CreateSubchannel(args);
719 }
720
721 void XdsLb::FallbackHelper::UpdateState(grpc_connectivity_state state,
722                                         UniquePtr<SubchannelPicker> picker) {
723   if (parent_->shutting_down_) return;
724   // If this request is from the pending fallback policy, ignore it until
725   // it reports READY, at which point we swap it into place.
726   if (CalledByPendingFallback()) {
727     if (GRPC_TRACE_FLAG_ENABLED(grpc_lb_xds_trace)) {
728       gpr_log(
729           GPR_INFO,
730           "[xdslb %p helper %p] pending fallback policy %p reports state=%s",
731           parent_.get(), this, parent_->pending_fallback_policy_.get(),
732           grpc_connectivity_state_name(state));
733     }
734     if (state != GRPC_CHANNEL_READY) return;
735     grpc_pollset_set_del_pollset_set(
736         parent_->fallback_policy_->interested_parties(),
737         parent_->interested_parties());
738     parent_->fallback_policy_ = std::move(parent_->pending_fallback_policy_);
739   } else if (!CalledByCurrentFallback()) {
740     // This request is from an outdated fallback policy, so ignore it.
741     return;
742   }
743   parent_->channel_control_helper()->UpdateState(state, std::move(picker));
744 }
745
746 void XdsLb::FallbackHelper::RequestReresolution() {
747   if (parent_->shutting_down_) return;
748   const LoadBalancingPolicy* latest_fallback_policy =
749       parent_->pending_fallback_policy_ != nullptr
750           ? parent_->pending_fallback_policy_.get()
751           : parent_->fallback_policy_.get();
752   if (child_ != latest_fallback_policy) return;
753   if (GRPC_TRACE_FLAG_ENABLED(grpc_lb_xds_trace)) {
754     gpr_log(GPR_INFO,
755             "[xdslb %p] Re-resolution requested from the fallback policy (%p).",
756             parent_.get(), child_);
757   }
758   GPR_ASSERT(parent_->lb_chand_ != nullptr);
759   parent_->channel_control_helper()->RequestReresolution();
760 }
761
762 void XdsLb::FallbackHelper::AddTraceEvent(TraceSeverity severity,
763                                           StringView message) {
764   if (parent_->shutting_down_ ||
765       (!CalledByPendingFallback() && !CalledByCurrentFallback())) {
766     return;
767   }
768   parent_->channel_control_helper()->AddTraceEvent(severity, message);
769 }
770
771 //
772 // XdsLb::LbChannelState
773 //
774
775 XdsLb::LbChannelState::LbChannelState(RefCountedPtr<XdsLb> xdslb_policy,
776                                       const char* balancer_name,
777                                       const grpc_channel_args& args)
778     : InternallyRefCounted<LbChannelState>(&grpc_lb_xds_trace),
779       xdslb_policy_(std::move(xdslb_policy)) {
780   GRPC_CLOSURE_INIT(&on_connectivity_changed_, OnConnectivityChangedLocked,
781                     this, grpc_combiner_scheduler(xdslb_policy_->combiner()));
782   channel_ = CreateXdsBalancerChannel(balancer_name, args);
783   GPR_ASSERT(channel_ != nullptr);
784   eds_calld_.reset(New<RetryableLbCall<EdsCallState>>(
785       Ref(DEBUG_LOCATION, "LbChannelState+eds")));
786   lrs_calld_.reset(New<RetryableLbCall<LrsCallState>>(
787       Ref(DEBUG_LOCATION, "LbChannelState+lrs")));
788 }
789
790 XdsLb::LbChannelState::~LbChannelState() {
791   if (GRPC_TRACE_FLAG_ENABLED(grpc_lb_xds_trace)) {
792     gpr_log(GPR_INFO, "[xdslb %p] Destroying LB channel %p", xdslb_policy(),
793             this);
794   }
795   grpc_channel_destroy(channel_);
796 }
797
798 void XdsLb::LbChannelState::Orphan() {
799   shutting_down_ = true;
800   eds_calld_.reset();
801   lrs_calld_.reset();
802   Unref(DEBUG_LOCATION, "LbChannelState+orphaned");
803 }
804
805 void XdsLb::LbChannelState::StartConnectivityWatchLocked() {
806   grpc_channel_element* client_channel_elem =
807       grpc_channel_stack_last_element(grpc_channel_get_channel_stack(channel_));
808   GPR_ASSERT(client_channel_elem->filter == &grpc_client_channel_filter);
809   // Ref held by callback.
810   Ref(DEBUG_LOCATION, "LbChannelState+start_watch").release();
811   grpc_client_channel_watch_connectivity_state(
812       client_channel_elem,
813       grpc_polling_entity_create_from_pollset_set(
814           xdslb_policy_->interested_parties()),
815       &connectivity_, &on_connectivity_changed_, nullptr);
816 }
817
818 void XdsLb::LbChannelState::CancelConnectivityWatchLocked() {
819   grpc_channel_element* client_channel_elem =
820       grpc_channel_stack_last_element(grpc_channel_get_channel_stack(channel_));
821   GPR_ASSERT(client_channel_elem->filter == &grpc_client_channel_filter);
822   grpc_client_channel_watch_connectivity_state(
823       client_channel_elem,
824       grpc_polling_entity_create_from_pollset_set(
825           xdslb_policy_->interested_parties()),
826       nullptr, &on_connectivity_changed_, nullptr);
827 }
828
829 void XdsLb::LbChannelState::OnConnectivityChangedLocked(void* arg,
830                                                         grpc_error* error) {
831   LbChannelState* self = static_cast<LbChannelState*>(arg);
832   if (!self->shutting_down_ &&
833       self->xdslb_policy_->fallback_at_startup_checks_pending_) {
834     if (self->connectivity_ != GRPC_CHANNEL_TRANSIENT_FAILURE) {
835       // Not in TRANSIENT_FAILURE.  Renew connectivity watch.
836       grpc_channel_element* client_channel_elem =
837           grpc_channel_stack_last_element(
838               grpc_channel_get_channel_stack(self->channel_));
839       GPR_ASSERT(client_channel_elem->filter == &grpc_client_channel_filter);
840       grpc_client_channel_watch_connectivity_state(
841           client_channel_elem,
842           grpc_polling_entity_create_from_pollset_set(
843               self->xdslb_policy_->interested_parties()),
844           &self->connectivity_, &self->on_connectivity_changed_, nullptr);
845       return;  // Early out so we don't drop the ref below.
846     }
847     // In TRANSIENT_FAILURE.  Cancel the fallback timer and go into
848     // fallback mode immediately.
849     gpr_log(GPR_INFO,
850             "[xdslb %p] Balancer channel in state TRANSIENT_FAILURE; "
851             "entering fallback mode",
852             self);
853     self->xdslb_policy_->fallback_at_startup_checks_pending_ = false;
854     grpc_timer_cancel(&self->xdslb_policy_->lb_fallback_timer_);
855     self->xdslb_policy_->UpdateFallbackPolicyLocked();
856   }
857   // Done watching connectivity state, so drop ref.
858   self->Unref(DEBUG_LOCATION, "LbChannelState+watch_done");
859 }
860
861 //
862 // XdsLb::LbChannelState::RetryableLbCall<>
863 //
864
865 template <typename T>
866 XdsLb::LbChannelState::RetryableLbCall<T>::RetryableLbCall(
867     RefCountedPtr<LbChannelState> lb_chand)
868     : lb_chand_(std::move(lb_chand)),
869       backoff_(
870           BackOff::Options()
871               .set_initial_backoff(GRPC_XDS_INITIAL_CONNECT_BACKOFF_SECONDS *
872                                    1000)
873               .set_multiplier(GRPC_XDS_RECONNECT_BACKOFF_MULTIPLIER)
874               .set_jitter(GRPC_XDS_RECONNECT_JITTER)
875               .set_max_backoff(GRPC_XDS_RECONNECT_MAX_BACKOFF_SECONDS * 1000)) {
876   GRPC_CLOSURE_INIT(
877       &on_retry_timer_, OnRetryTimerLocked, this,
878       grpc_combiner_scheduler(lb_chand_->xdslb_policy()->combiner()));
879   StartNewCallLocked();
880 }
881
882 template <typename T>
883 void XdsLb::LbChannelState::RetryableLbCall<T>::Orphan() {
884   shutting_down_ = true;
885   lb_calld_.reset();
886   if (retry_timer_callback_pending_) grpc_timer_cancel(&retry_timer_);
887   this->Unref(DEBUG_LOCATION, "RetryableLbCall+orphaned");
888 }
889
890 template <typename T>
891 void XdsLb::LbChannelState::RetryableLbCall<T>::OnCallFinishedLocked() {
892   const bool seen_response = lb_calld_->seen_response();
893   lb_calld_.reset();
894   if (seen_response) {
895     // If we lost connection to the LB server, reset backoff and restart the LB
896     // call immediately.
897     backoff_.Reset();
898     StartNewCallLocked();
899   } else {
900     // If we failed to connect to the LB server, retry later.
901     StartRetryTimerLocked();
902   }
903 }
904
905 template <typename T>
906 void XdsLb::LbChannelState::RetryableLbCall<T>::StartNewCallLocked() {
907   if (shutting_down_) return;
908   GPR_ASSERT(lb_chand_->channel_ != nullptr);
909   GPR_ASSERT(lb_calld_ == nullptr);
910   if (GRPC_TRACE_FLAG_ENABLED(grpc_lb_xds_trace)) {
911     gpr_log(GPR_INFO,
912             "[xdslb %p] Start new call from retryable call (lb_chand: %p, "
913             "retryable call: %p)",
914             lb_chand()->xdslb_policy(), lb_chand(), this);
915   }
916   lb_calld_ = MakeOrphanable<T>(
917       this->Ref(DEBUG_LOCATION, "RetryableLbCall+start_new_call"));
918 }
919
920 template <typename T>
921 void XdsLb::LbChannelState::RetryableLbCall<T>::StartRetryTimerLocked() {
922   if (shutting_down_) return;
923   const grpc_millis next_attempt_time = backoff_.NextAttemptTime();
924   if (GRPC_TRACE_FLAG_ENABLED(grpc_lb_xds_trace)) {
925     grpc_millis timeout = GPR_MAX(next_attempt_time - ExecCtx::Get()->Now(), 0);
926     gpr_log(GPR_INFO,
927             "[xdslb %p] Failed to connect to LB server (lb_chand: %p) "
928             "retry timer will fire in %" PRId64 "ms.",
929             lb_chand()->xdslb_policy(), lb_chand(), timeout);
930   }
931   this->Ref(DEBUG_LOCATION, "RetryableLbCall+retry_timer_start").release();
932   grpc_timer_init(&retry_timer_, next_attempt_time, &on_retry_timer_);
933   retry_timer_callback_pending_ = true;
934 }
935
936 template <typename T>
937 void XdsLb::LbChannelState::RetryableLbCall<T>::OnRetryTimerLocked(
938     void* arg, grpc_error* error) {
939   RetryableLbCall* lb_calld = static_cast<RetryableLbCall*>(arg);
940   lb_calld->retry_timer_callback_pending_ = false;
941   if (!lb_calld->shutting_down_ && error == GRPC_ERROR_NONE) {
942     if (GRPC_TRACE_FLAG_ENABLED(grpc_lb_xds_trace)) {
943       gpr_log(GPR_INFO,
944               "[xdslb %p] Retry timer fires (lb_chand: %p, retryable call: %p)",
945               lb_calld->lb_chand()->xdslb_policy(), lb_calld->lb_chand(),
946               lb_calld);
947     }
948     lb_calld->StartNewCallLocked();
949   }
950   lb_calld->Unref(DEBUG_LOCATION, "RetryableLbCall+retry_timer_done");
951 }
952
953 //
954 // XdsLb::LbChannelState::EdsCallState
955 //
956
957 XdsLb::LbChannelState::EdsCallState::EdsCallState(
958     RefCountedPtr<RetryableLbCall<EdsCallState>> parent)
959     : InternallyRefCounted<EdsCallState>(&grpc_lb_xds_trace),
960       parent_(std::move(parent)) {
961   // Init the LB call. Note that the LB call will progress every time there's
962   // activity in xdslb_policy()->interested_parties(), which is comprised of
963   // the polling entities from client_channel.
964   GPR_ASSERT(xdslb_policy() != nullptr);
965   GPR_ASSERT(xdslb_policy()->server_name_ != nullptr);
966   GPR_ASSERT(xdslb_policy()->server_name_[0] != '\0');
967   const grpc_millis deadline =
968       xdslb_policy()->lb_call_timeout_ms_ == 0
969           ? GRPC_MILLIS_INF_FUTURE
970           : ExecCtx::Get()->Now() + xdslb_policy()->lb_call_timeout_ms_;
971   // Create an LB call with the specified method name.
972   lb_call_ = grpc_channel_create_pollset_set_call(
973       lb_chand()->channel_, nullptr, GRPC_PROPAGATE_DEFAULTS,
974       xdslb_policy()->interested_parties(),
975       GRPC_MDSTR_SLASH_ENVOY_DOT_API_DOT_V2_DOT_ENDPOINTDISCOVERYSERVICE_SLASH_STREAMENDPOINTS,
976       nullptr, deadline, nullptr);
977   GPR_ASSERT(lb_call_ != nullptr);
978   // Init the LB call request payload.
979   grpc_slice request_payload_slice =
980       XdsEdsRequestCreateAndEncode(xdslb_policy()->server_name_);
981   send_message_payload_ =
982       grpc_raw_byte_buffer_create(&request_payload_slice, 1);
983   grpc_slice_unref_internal(request_payload_slice);
984   // Init other data associated with the LB call.
985   grpc_metadata_array_init(&initial_metadata_recv_);
986   grpc_metadata_array_init(&trailing_metadata_recv_);
987   GRPC_CLOSURE_INIT(&on_response_received_, OnResponseReceivedLocked, this,
988                     grpc_combiner_scheduler(xdslb_policy()->combiner()));
989   GRPC_CLOSURE_INIT(&on_status_received_, OnStatusReceivedLocked, this,
990                     grpc_combiner_scheduler(xdslb_policy()->combiner()));
991   // Start the call.
992   if (GRPC_TRACE_FLAG_ENABLED(grpc_lb_xds_trace)) {
993     gpr_log(GPR_INFO,
994             "[xdslb %p] Starting EDS call (lb_chand: %p, lb_calld: %p, "
995             "lb_call: %p)",
996             xdslb_policy(), lb_chand(), this, lb_call_);
997   }
998   // Create the ops.
999   grpc_call_error call_error;
1000   grpc_op ops[3];
1001   memset(ops, 0, sizeof(ops));
1002   // Op: send initial metadata.
1003   grpc_op* op = ops;
1004   op->op = GRPC_OP_SEND_INITIAL_METADATA;
1005   op->data.send_initial_metadata.count = 0;
1006   op->flags = 0;
1007   op->reserved = nullptr;
1008   op++;
1009   // Op: send request message.
1010   GPR_ASSERT(send_message_payload_ != nullptr);
1011   op->op = GRPC_OP_SEND_MESSAGE;
1012   op->data.send_message.send_message = send_message_payload_;
1013   op->flags = 0;
1014   op->reserved = nullptr;
1015   op++;
1016   call_error = grpc_call_start_batch_and_execute(lb_call_, ops,
1017                                                  (size_t)(op - ops), nullptr);
1018   GPR_ASSERT(GRPC_CALL_OK == call_error);
1019   // Op: recv initial metadata.
1020   op = ops;
1021   op->op = GRPC_OP_RECV_INITIAL_METADATA;
1022   op->data.recv_initial_metadata.recv_initial_metadata =
1023       &initial_metadata_recv_;
1024   op->flags = 0;
1025   op->reserved = nullptr;
1026   op++;
1027   // Op: recv response.
1028   op->op = GRPC_OP_RECV_MESSAGE;
1029   op->data.recv_message.recv_message = &recv_message_payload_;
1030   op->flags = 0;
1031   op->reserved = nullptr;
1032   op++;
1033   Ref(DEBUG_LOCATION, "EDS+OnResponseReceivedLocked").release();
1034   call_error = grpc_call_start_batch_and_execute(
1035       lb_call_, ops, (size_t)(op - ops), &on_response_received_);
1036   GPR_ASSERT(GRPC_CALL_OK == call_error);
1037   // Op: recv server status.
1038   op = ops;
1039   op->op = GRPC_OP_RECV_STATUS_ON_CLIENT;
1040   op->data.recv_status_on_client.trailing_metadata = &trailing_metadata_recv_;
1041   op->data.recv_status_on_client.status = &status_code_;
1042   op->data.recv_status_on_client.status_details = &status_details_;
1043   op->flags = 0;
1044   op->reserved = nullptr;
1045   op++;
1046   // This callback signals the end of the LB call, so it relies on the initial
1047   // ref instead of a new ref. When it's invoked, it's the initial ref that is
1048   // unreffed.
1049   call_error = grpc_call_start_batch_and_execute(
1050       lb_call_, ops, (size_t)(op - ops), &on_status_received_);
1051   GPR_ASSERT(GRPC_CALL_OK == call_error);
1052 }
1053
1054 XdsLb::LbChannelState::EdsCallState::~EdsCallState() {
1055   grpc_metadata_array_destroy(&initial_metadata_recv_);
1056   grpc_metadata_array_destroy(&trailing_metadata_recv_);
1057   grpc_byte_buffer_destroy(send_message_payload_);
1058   grpc_byte_buffer_destroy(recv_message_payload_);
1059   grpc_slice_unref_internal(status_details_);
1060   GPR_ASSERT(lb_call_ != nullptr);
1061   grpc_call_unref(lb_call_);
1062 }
1063
1064 void XdsLb::LbChannelState::EdsCallState::Orphan() {
1065   GPR_ASSERT(lb_call_ != nullptr);
1066   // If we are here because xdslb_policy wants to cancel the call,
1067   // on_status_received_ will complete the cancellation and clean up. Otherwise,
1068   // we are here because xdslb_policy has to orphan a failed call, then the
1069   // following cancellation will be a no-op.
1070   grpc_call_cancel(lb_call_, nullptr);
1071   // Note that the initial ref is hold by on_status_received_. So the
1072   // corresponding unref happens in on_status_received_ instead of here.
1073 }
1074
1075 void XdsLb::LbChannelState::EdsCallState::OnResponseReceivedLocked(
1076     void* arg, grpc_error* error) {
1077   EdsCallState* eds_calld = static_cast<EdsCallState*>(arg);
1078   LbChannelState* lb_chand = eds_calld->lb_chand();
1079   XdsLb* xdslb_policy = eds_calld->xdslb_policy();
1080   // Empty payload means the LB call was cancelled.
1081   if (!eds_calld->IsCurrentCallOnChannel() ||
1082       eds_calld->recv_message_payload_ == nullptr) {
1083     eds_calld->Unref(DEBUG_LOCATION, "EDS+OnResponseReceivedLocked");
1084     return;
1085   }
1086   // Read the response.
1087   grpc_byte_buffer_reader bbr;
1088   grpc_byte_buffer_reader_init(&bbr, eds_calld->recv_message_payload_);
1089   grpc_slice response_slice = grpc_byte_buffer_reader_readall(&bbr);
1090   grpc_byte_buffer_reader_destroy(&bbr);
1091   grpc_byte_buffer_destroy(eds_calld->recv_message_payload_);
1092   eds_calld->recv_message_payload_ = nullptr;
1093   // TODO(juanlishen): When we convert this to use the xds protocol, the
1094   // balancer will send us a fallback timeout such that we should go into
1095   // fallback mode if we have lost contact with the balancer after a certain
1096   // period of time. We will need to save the timeout value here, and then
1097   // when the balancer call ends, we will need to start a timer for the
1098   // specified period of time, and if the timer fires, we go into fallback
1099   // mode. We will also need to cancel the timer when we receive a serverlist
1100   // from the balancer.
1101   // This anonymous lambda is a hack to avoid the usage of goto.
1102   [&]() {
1103     // Parse the response.
1104     XdsUpdate update;
1105     grpc_error* parse_error =
1106         XdsEdsResponseDecodeAndParse(response_slice, &update);
1107     if (parse_error != GRPC_ERROR_NONE) {
1108       gpr_log(GPR_ERROR, "[xdslb %p] EDS response parsing failed. error=%s",
1109               xdslb_policy, grpc_error_string(parse_error));
1110       GRPC_ERROR_UNREF(parse_error);
1111       return;
1112     }
1113     if (update.locality_list.empty() && !update.drop_all) {
1114       char* response_slice_str =
1115           grpc_dump_slice(response_slice, GPR_DUMP_ASCII | GPR_DUMP_HEX);
1116       gpr_log(GPR_ERROR,
1117               "[xdslb %p] EDS response '%s' doesn't contain any valid locality "
1118               "but doesn't require to drop all calls. Ignoring.",
1119               xdslb_policy, response_slice_str);
1120       gpr_free(response_slice_str);
1121       return;
1122     }
1123     eds_calld->seen_response_ = true;
1124     if (GRPC_TRACE_FLAG_ENABLED(grpc_lb_xds_trace)) {
1125       gpr_log(GPR_INFO,
1126               "[xdslb %p] EDS response with %" PRIuPTR
1127               " localities and %" PRIuPTR
1128               " drop categories received (drop_all=%d)",
1129               xdslb_policy, update.locality_list.size(),
1130               update.drop_config->drop_category_list().size(), update.drop_all);
1131       for (size_t i = 0; i < update.locality_list.size(); ++i) {
1132         const XdsLocalityInfo& locality = update.locality_list[i];
1133         gpr_log(GPR_INFO,
1134                 "[xdslb %p] Locality %" PRIuPTR " %s contains %" PRIuPTR
1135                 " server addresses",
1136                 xdslb_policy, i,
1137                 locality.locality_name->AsHumanReadableString(),
1138                 locality.serverlist.size());
1139         for (size_t j = 0; j < locality.serverlist.size(); ++j) {
1140           char* ipport;
1141           grpc_sockaddr_to_string(&ipport, &locality.serverlist[j].address(),
1142                                   false);
1143           gpr_log(GPR_INFO,
1144                   "[xdslb %p] Locality %" PRIuPTR
1145                   " %s, server address %" PRIuPTR ": %s",
1146                   xdslb_policy, i,
1147                   locality.locality_name->AsHumanReadableString(), j, ipport);
1148           gpr_free(ipport);
1149         }
1150       }
1151       for (size_t i = 0; i < update.drop_config->drop_category_list().size();
1152            ++i) {
1153         const XdsDropConfig::DropCategory& drop_category =
1154             update.drop_config->drop_category_list()[i];
1155         gpr_log(GPR_INFO,
1156                 "[xdslb %p] Drop category %s has drop rate %d per million",
1157                 xdslb_policy, drop_category.name.get(),
1158                 drop_category.parts_per_million);
1159       }
1160     }
1161     // Pending LB channel receives a response; promote it.
1162     // Note that this call can't be on a discarded pending channel, because
1163     // such channels don't have any current call but we have checked this call
1164     // is a current call.
1165     if (!lb_chand->IsCurrentChannel()) {
1166       if (GRPC_TRACE_FLAG_ENABLED(grpc_lb_xds_trace)) {
1167         gpr_log(GPR_INFO,
1168                 "[xdslb %p] Pending LB channel %p receives EDS response; "
1169                 "promoting it to replace current LB channel %p",
1170                 xdslb_policy, lb_chand, xdslb_policy->lb_chand_.get());
1171       }
1172       // TODO(juanlishen): Maybe promote the pending LB channel when the
1173       // response results a READY locality map.
1174       xdslb_policy->lb_chand_ = std::move(xdslb_policy->pending_lb_chand_);
1175     }
1176     // At this point, lb_chand must be the current LB channel, so try to start
1177     // load reporting.
1178     LrsCallState* lrs_calld = lb_chand->lrs_calld_->lb_calld();
1179     if (lrs_calld != nullptr) lrs_calld->MaybeStartReportingLocked();
1180     // If the balancer tells us to drop all the calls, we should exit fallback
1181     // mode immediately.
1182     if (update.drop_all) xdslb_policy->MaybeExitFallbackMode();
1183     // Update the drop config.
1184     const bool drop_config_changed =
1185         xdslb_policy->drop_config_ == nullptr ||
1186         *xdslb_policy->drop_config_ != *update.drop_config;
1187     xdslb_policy->drop_config_ = std::move(update.drop_config);
1188     // Ignore identical locality update.
1189     if (xdslb_policy->locality_list_ == update.locality_list) {
1190       if (GRPC_TRACE_FLAG_ENABLED(grpc_lb_xds_trace)) {
1191         gpr_log(GPR_INFO,
1192                 "[xdslb %p] Incoming locality list identical to current, "
1193                 "ignoring. (drop_config_changed=%d)",
1194                 xdslb_policy, drop_config_changed);
1195       }
1196       if (drop_config_changed) {
1197         xdslb_policy->locality_map_.UpdateXdsPickerLocked();
1198       }
1199       return;
1200     }
1201     // Update the locality list.
1202     xdslb_policy->locality_list_ = std::move(update.locality_list);
1203     // Update the locality map.
1204     xdslb_policy->locality_map_.UpdateLocked(
1205         xdslb_policy->locality_list_, xdslb_policy->child_policy_config_.get(),
1206         xdslb_policy->args_, xdslb_policy);
1207   }();
1208   grpc_slice_unref_internal(response_slice);
1209   if (xdslb_policy->shutting_down_) {
1210     eds_calld->Unref(DEBUG_LOCATION,
1211                      "EDS+OnResponseReceivedLocked+xds_shutdown");
1212     return;
1213   }
1214   // Keep listening for serverlist updates.
1215   grpc_op op;
1216   memset(&op, 0, sizeof(op));
1217   op.op = GRPC_OP_RECV_MESSAGE;
1218   op.data.recv_message.recv_message = &eds_calld->recv_message_payload_;
1219   op.flags = 0;
1220   op.reserved = nullptr;
1221   GPR_ASSERT(eds_calld->lb_call_ != nullptr);
1222   // Reuse the "EDS+OnResponseReceivedLocked" ref taken in ctor.
1223   const grpc_call_error call_error = grpc_call_start_batch_and_execute(
1224       eds_calld->lb_call_, &op, 1, &eds_calld->on_response_received_);
1225   GPR_ASSERT(GRPC_CALL_OK == call_error);
1226 }
1227
1228 void XdsLb::LbChannelState::EdsCallState::OnStatusReceivedLocked(
1229     void* arg, grpc_error* error) {
1230   EdsCallState* eds_calld = static_cast<EdsCallState*>(arg);
1231   LbChannelState* lb_chand = eds_calld->lb_chand();
1232   XdsLb* xdslb_policy = eds_calld->xdslb_policy();
1233   if (GRPC_TRACE_FLAG_ENABLED(grpc_lb_xds_trace)) {
1234     char* status_details = grpc_slice_to_c_string(eds_calld->status_details_);
1235     gpr_log(GPR_INFO,
1236             "[xdslb %p] EDS call status received. Status = %d, details "
1237             "= '%s', (lb_chand: %p, eds_calld: %p, lb_call: %p), error '%s'",
1238             xdslb_policy, eds_calld->status_code_, status_details, lb_chand,
1239             eds_calld, eds_calld->lb_call_, grpc_error_string(error));
1240     gpr_free(status_details);
1241   }
1242   // Ignore status from a stale call.
1243   if (eds_calld->IsCurrentCallOnChannel()) {
1244     // Because this call is the current one on the channel, the channel can't
1245     // have been swapped out; otherwise, the call should have been reset.
1246     GPR_ASSERT(lb_chand->IsCurrentChannel() || lb_chand->IsPendingChannel());
1247     if (lb_chand != xdslb_policy->LatestLbChannel()) {
1248       // This channel must be the current one and there is a pending one. Swap
1249       // in the pending one and we are done.
1250       if (GRPC_TRACE_FLAG_ENABLED(grpc_lb_xds_trace)) {
1251         gpr_log(GPR_INFO,
1252                 "[xdslb %p] Promoting pending LB channel %p to replace "
1253                 "current LB channel %p",
1254                 xdslb_policy, lb_chand, xdslb_policy->lb_chand_.get());
1255       }
1256       xdslb_policy->lb_chand_ = std::move(xdslb_policy->pending_lb_chand_);
1257     } else {
1258       // This channel is the most recently created one. Try to restart the call
1259       // and reresolve.
1260       eds_calld->parent_->OnCallFinishedLocked();
1261       xdslb_policy->channel_control_helper()->RequestReresolution();
1262       // If the fallback-at-startup checks are pending, go into fallback mode
1263       // immediately.  This short-circuits the timeout for the
1264       // fallback-at-startup case.
1265       if (xdslb_policy->fallback_at_startup_checks_pending_) {
1266         gpr_log(GPR_INFO,
1267                 "[xdslb %p] Balancer call finished; entering fallback mode",
1268                 xdslb_policy);
1269         xdslb_policy->fallback_at_startup_checks_pending_ = false;
1270         grpc_timer_cancel(&xdslb_policy->lb_fallback_timer_);
1271         lb_chand->CancelConnectivityWatchLocked();
1272         xdslb_policy->UpdateFallbackPolicyLocked();
1273       }
1274     }
1275   }
1276   eds_calld->Unref(DEBUG_LOCATION, "EDS+OnStatusReceivedLocked");
1277 }
1278
1279 bool XdsLb::LbChannelState::EdsCallState::IsCurrentCallOnChannel() const {
1280   // If the retryable EDS call is null (which only happens when the LB channel
1281   // is shutting down), all the EDS calls are stale.
1282   if (lb_chand()->eds_calld_ == nullptr) return false;
1283   return this == lb_chand()->eds_calld_->lb_calld();
1284 }
1285
1286 //
1287 // XdsLb::LbChannelState::LrsCallState::Reporter
1288 //
1289
1290 void XdsLb::LbChannelState::LrsCallState::Reporter::Orphan() {
1291   if (next_report_timer_callback_pending_) {
1292     grpc_timer_cancel(&next_report_timer_);
1293   }
1294 }
1295
1296 void XdsLb::LbChannelState::LrsCallState::Reporter::ScheduleNextReportLocked() {
1297   const grpc_millis next_report_time = ExecCtx::Get()->Now() + report_interval_;
1298   grpc_timer_init(&next_report_timer_, next_report_time,
1299                   &on_next_report_timer_);
1300   next_report_timer_callback_pending_ = true;
1301 }
1302
1303 void XdsLb::LbChannelState::LrsCallState::Reporter::OnNextReportTimerLocked(
1304     void* arg, grpc_error* error) {
1305   Reporter* self = static_cast<Reporter*>(arg);
1306   self->next_report_timer_callback_pending_ = false;
1307   if (error != GRPC_ERROR_NONE || !self->IsCurrentReporterOnCall()) {
1308     self->Unref(DEBUG_LOCATION, "Reporter+timer");
1309     return;
1310   }
1311   self->SendReportLocked();
1312 }
1313
1314 void XdsLb::LbChannelState::LrsCallState::Reporter::SendReportLocked() {
1315   // Create a request that contains the load report.
1316   grpc_slice request_payload_slice = XdsLrsRequestCreateAndEncode(
1317       xdslb_policy()->server_name_, &xdslb_policy()->client_stats_);
1318   // Skip client load report if the counters were all zero in the last
1319   // report and they are still zero in this one.
1320   const bool old_val = last_report_counters_were_zero_;
1321   last_report_counters_were_zero_ = static_cast<bool>(
1322       grpc_slice_eq(request_payload_slice, grpc_empty_slice()));
1323   if (old_val && last_report_counters_were_zero_) {
1324     ScheduleNextReportLocked();
1325     return;
1326   }
1327   parent_->send_message_payload_ =
1328       grpc_raw_byte_buffer_create(&request_payload_slice, 1);
1329   grpc_slice_unref_internal(request_payload_slice);
1330   // Send the report.
1331   grpc_op op;
1332   memset(&op, 0, sizeof(op));
1333   op.op = GRPC_OP_SEND_MESSAGE;
1334   op.data.send_message.send_message = parent_->send_message_payload_;
1335   grpc_call_error call_error = grpc_call_start_batch_and_execute(
1336       parent_->lb_call_, &op, 1, &on_report_done_);
1337   if (GPR_UNLIKELY(call_error != GRPC_CALL_OK)) {
1338     gpr_log(GPR_ERROR,
1339             "[xdslb %p] lb_calld=%p call_error=%d sending client load report",
1340             xdslb_policy(), this, call_error);
1341     GPR_ASSERT(GRPC_CALL_OK == call_error);
1342   }
1343 }
1344
1345 void XdsLb::LbChannelState::LrsCallState::Reporter::OnReportDoneLocked(
1346     void* arg, grpc_error* error) {
1347   Reporter* self = static_cast<Reporter*>(arg);
1348   grpc_byte_buffer_destroy(self->parent_->send_message_payload_);
1349   self->parent_->send_message_payload_ = nullptr;
1350   if (error != GRPC_ERROR_NONE || !self->IsCurrentReporterOnCall()) {
1351     // If this reporter is no longer the current one on the call, the reason
1352     // might be that it was orphaned for a new one due to config update.
1353     if (!self->IsCurrentReporterOnCall()) {
1354       self->parent_->MaybeStartReportingLocked();
1355     }
1356     self->Unref(DEBUG_LOCATION, "Reporter+report_done");
1357     return;
1358   }
1359   self->ScheduleNextReportLocked();
1360 }
1361
1362 //
1363 // XdsLb::LbChannelState::LrsCallState
1364 //
1365
1366 XdsLb::LbChannelState::LrsCallState::LrsCallState(
1367     RefCountedPtr<RetryableLbCall<LrsCallState>> parent)
1368     : InternallyRefCounted<LrsCallState>(&grpc_lb_xds_trace),
1369       parent_(std::move(parent)) {
1370   // Init the LB call. Note that the LB call will progress every time there's
1371   // activity in xdslb_policy()->interested_parties(), which is comprised of
1372   // the polling entities from client_channel.
1373   GPR_ASSERT(xdslb_policy() != nullptr);
1374   GPR_ASSERT(xdslb_policy()->server_name_ != nullptr);
1375   GPR_ASSERT(xdslb_policy()->server_name_[0] != '\0');
1376   const grpc_millis deadline =
1377       xdslb_policy()->lb_call_timeout_ms_ == 0
1378           ? GRPC_MILLIS_INF_FUTURE
1379           : ExecCtx::Get()->Now() + xdslb_policy()->lb_call_timeout_ms_;
1380   lb_call_ = grpc_channel_create_pollset_set_call(
1381       lb_chand()->channel_, nullptr, GRPC_PROPAGATE_DEFAULTS,
1382       xdslb_policy()->interested_parties(),
1383       GRPC_MDSTR_SLASH_ENVOY_DOT_SERVICE_DOT_LOAD_STATS_DOT_V2_DOT_LOADREPORTINGSERVICE_SLASH_STREAMLOADSTATS,
1384       nullptr, deadline, nullptr);
1385   GPR_ASSERT(lb_call_ != nullptr);
1386   // Init the LB call request payload.
1387   grpc_slice request_payload_slice =
1388       XdsLrsRequestCreateAndEncode(xdslb_policy()->server_name_);
1389   send_message_payload_ =
1390       grpc_raw_byte_buffer_create(&request_payload_slice, 1);
1391   grpc_slice_unref_internal(request_payload_slice);
1392   // Init other data associated with the LRS call.
1393   grpc_metadata_array_init(&initial_metadata_recv_);
1394   grpc_metadata_array_init(&trailing_metadata_recv_);
1395   GRPC_CLOSURE_INIT(&on_initial_request_sent_, OnInitialRequestSentLocked, this,
1396                     grpc_combiner_scheduler(xdslb_policy()->combiner()));
1397   GRPC_CLOSURE_INIT(&on_response_received_, OnResponseReceivedLocked, this,
1398                     grpc_combiner_scheduler(xdslb_policy()->combiner()));
1399   GRPC_CLOSURE_INIT(&on_status_received_, OnStatusReceivedLocked, this,
1400                     grpc_combiner_scheduler(xdslb_policy()->combiner()));
1401   // Start the call.
1402   if (GRPC_TRACE_FLAG_ENABLED(grpc_lb_xds_trace)) {
1403     gpr_log(GPR_INFO,
1404             "[xdslb %p] Starting LRS call (lb_chand: %p, lb_calld: %p, "
1405             "lb_call: %p)",
1406             xdslb_policy(), lb_chand(), this, lb_call_);
1407   }
1408   // Create the ops.
1409   grpc_call_error call_error;
1410   grpc_op ops[3];
1411   memset(ops, 0, sizeof(ops));
1412   // Op: send initial metadata.
1413   grpc_op* op = ops;
1414   op->op = GRPC_OP_SEND_INITIAL_METADATA;
1415   op->data.send_initial_metadata.count = 0;
1416   op->flags = 0;
1417   op->reserved = nullptr;
1418   op++;
1419   // Op: send request message.
1420   GPR_ASSERT(send_message_payload_ != nullptr);
1421   op->op = GRPC_OP_SEND_MESSAGE;
1422   op->data.send_message.send_message = send_message_payload_;
1423   op->flags = 0;
1424   op->reserved = nullptr;
1425   op++;
1426   Ref(DEBUG_LOCATION, "LRS+OnInitialRequestSentLocked").release();
1427   call_error = grpc_call_start_batch_and_execute(
1428       lb_call_, ops, (size_t)(op - ops), &on_initial_request_sent_);
1429   GPR_ASSERT(GRPC_CALL_OK == call_error);
1430   // Op: recv initial metadata.
1431   op = ops;
1432   op->op = GRPC_OP_RECV_INITIAL_METADATA;
1433   op->data.recv_initial_metadata.recv_initial_metadata =
1434       &initial_metadata_recv_;
1435   op->flags = 0;
1436   op->reserved = nullptr;
1437   op++;
1438   // Op: recv response.
1439   op->op = GRPC_OP_RECV_MESSAGE;
1440   op->data.recv_message.recv_message = &recv_message_payload_;
1441   op->flags = 0;
1442   op->reserved = nullptr;
1443   op++;
1444   Ref(DEBUG_LOCATION, "LRS+OnResponseReceivedLocked").release();
1445   call_error = grpc_call_start_batch_and_execute(
1446       lb_call_, ops, (size_t)(op - ops), &on_response_received_);
1447   GPR_ASSERT(GRPC_CALL_OK == call_error);
1448   // Op: recv server status.
1449   op = ops;
1450   op->op = GRPC_OP_RECV_STATUS_ON_CLIENT;
1451   op->data.recv_status_on_client.trailing_metadata = &trailing_metadata_recv_;
1452   op->data.recv_status_on_client.status = &status_code_;
1453   op->data.recv_status_on_client.status_details = &status_details_;
1454   op->flags = 0;
1455   op->reserved = nullptr;
1456   op++;
1457   // This callback signals the end of the LB call, so it relies on the initial
1458   // ref instead of a new ref. When it's invoked, it's the initial ref that is
1459   // unreffed.
1460   call_error = grpc_call_start_batch_and_execute(
1461       lb_call_, ops, (size_t)(op - ops), &on_status_received_);
1462   GPR_ASSERT(GRPC_CALL_OK == call_error);
1463 }
1464
1465 XdsLb::LbChannelState::LrsCallState::~LrsCallState() {
1466   grpc_metadata_array_destroy(&initial_metadata_recv_);
1467   grpc_metadata_array_destroy(&trailing_metadata_recv_);
1468   grpc_byte_buffer_destroy(send_message_payload_);
1469   grpc_byte_buffer_destroy(recv_message_payload_);
1470   grpc_slice_unref_internal(status_details_);
1471   GPR_ASSERT(lb_call_ != nullptr);
1472   grpc_call_unref(lb_call_);
1473 }
1474
1475 void XdsLb::LbChannelState::LrsCallState::Orphan() {
1476   reporter_.reset();
1477   GPR_ASSERT(lb_call_ != nullptr);
1478   // If we are here because xdslb_policy wants to cancel the call,
1479   // on_status_received_ will complete the cancellation and clean up. Otherwise,
1480   // we are here because xdslb_policy has to orphan a failed call, then the
1481   // following cancellation will be a no-op.
1482   grpc_call_cancel(lb_call_, nullptr);
1483   // Note that the initial ref is hold by on_status_received_. So the
1484   // corresponding unref happens in on_status_received_ instead of here.
1485 }
1486
1487 void XdsLb::LbChannelState::LrsCallState::MaybeStartReportingLocked() {
1488   // Don't start if this is not the current call on the current channel.
1489   if (!IsCurrentCallOnChannel() || !lb_chand()->IsCurrentChannel()) return;
1490   // Don't start again if already started.
1491   if (reporter_ != nullptr) return;
1492   // Don't start if the previous send_message op (of the initial request or the
1493   // last report of the previous reporter) hasn't completed.
1494   if (send_message_payload_ != nullptr) return;
1495   // Don't start if no LRS response has arrived.
1496   if (!seen_response()) return;
1497   // Don't start if the EDS call hasn't received any valid response. Note that
1498   // this must be the first channel because it is the current channel but its
1499   // EDS call hasn't seen any response.
1500   EdsCallState* eds_calld = lb_chand()->eds_calld_->lb_calld();
1501   if (eds_calld == nullptr || !eds_calld->seen_response()) return;
1502   // Start reporting.
1503   lb_chand()->xdslb_policy_->client_stats_.MaybeInitLastReportTime();
1504   reporter_ = MakeOrphanable<Reporter>(
1505       Ref(DEBUG_LOCATION, "LRS+load_report+start"), load_reporting_interval_);
1506 }
1507
1508 void XdsLb::LbChannelState::LrsCallState::OnInitialRequestSentLocked(
1509     void* arg, grpc_error* error) {
1510   LrsCallState* lrs_calld = static_cast<LrsCallState*>(arg);
1511   // Clear the send_message_payload_.
1512   grpc_byte_buffer_destroy(lrs_calld->send_message_payload_);
1513   lrs_calld->send_message_payload_ = nullptr;
1514   lrs_calld->MaybeStartReportingLocked();
1515   lrs_calld->Unref(DEBUG_LOCATION, "LRS+OnInitialRequestSentLocked");
1516 }
1517
1518 void XdsLb::LbChannelState::LrsCallState::OnResponseReceivedLocked(
1519     void* arg, grpc_error* error) {
1520   LrsCallState* lrs_calld = static_cast<LrsCallState*>(arg);
1521   XdsLb* xdslb_policy = lrs_calld->xdslb_policy();
1522   // Empty payload means the LB call was cancelled.
1523   if (!lrs_calld->IsCurrentCallOnChannel() ||
1524       lrs_calld->recv_message_payload_ == nullptr) {
1525     lrs_calld->Unref(DEBUG_LOCATION, "LRS+OnResponseReceivedLocked");
1526     return;
1527   }
1528   // Read the response.
1529   grpc_byte_buffer_reader bbr;
1530   grpc_byte_buffer_reader_init(&bbr, lrs_calld->recv_message_payload_);
1531   grpc_slice response_slice = grpc_byte_buffer_reader_readall(&bbr);
1532   grpc_byte_buffer_reader_destroy(&bbr);
1533   grpc_byte_buffer_destroy(lrs_calld->recv_message_payload_);
1534   lrs_calld->recv_message_payload_ = nullptr;
1535   // This anonymous lambda is a hack to avoid the usage of goto.
1536   [&]() {
1537     // Parse the response.
1538     grpc_millis new_load_reporting_interval;
1539     grpc_error* parse_error = XdsLrsResponseDecodeAndParse(
1540         response_slice, &new_load_reporting_interval,
1541         xdslb_policy->server_name_);
1542     if (parse_error != GRPC_ERROR_NONE) {
1543       gpr_log(GPR_ERROR, "[xdslb %p] LRS response parsing failed. error=%s",
1544               xdslb_policy, grpc_error_string(parse_error));
1545       GRPC_ERROR_UNREF(parse_error);
1546       return;
1547     }
1548     lrs_calld->seen_response_ = true;
1549     if (GRPC_TRACE_FLAG_ENABLED(grpc_lb_xds_trace)) {
1550       gpr_log(GPR_INFO,
1551               "[xdslb %p] LRS response received, load_report_interval=%" PRId64
1552               "ms",
1553               xdslb_policy, new_load_reporting_interval);
1554     }
1555     if (new_load_reporting_interval <
1556         GRPC_XDS_MIN_CLIENT_LOAD_REPORTING_INTERVAL_MS) {
1557       new_load_reporting_interval =
1558           GRPC_XDS_MIN_CLIENT_LOAD_REPORTING_INTERVAL_MS;
1559       if (GRPC_TRACE_FLAG_ENABLED(grpc_lb_xds_trace)) {
1560         gpr_log(
1561             GPR_INFO,
1562             "[xdslb %p] Increased load_report_interval to minimum value %dms",
1563             xdslb_policy, GRPC_XDS_MIN_CLIENT_LOAD_REPORTING_INTERVAL_MS);
1564       }
1565     }
1566     // Ignore identical update.
1567     if (lrs_calld->load_reporting_interval_ == new_load_reporting_interval) {
1568       if (GRPC_TRACE_FLAG_ENABLED(grpc_lb_xds_trace)) {
1569         gpr_log(GPR_INFO,
1570                 "[xdslb %p] Incoming LRS response identical to current, "
1571                 "ignoring.",
1572                 xdslb_policy);
1573       }
1574       return;
1575     }
1576     // Stop current load reporting (if any) to adopt the new reporting interval.
1577     lrs_calld->reporter_.reset();
1578     // Record the new config.
1579     lrs_calld->load_reporting_interval_ = new_load_reporting_interval;
1580     // Try starting sending load report.
1581     lrs_calld->MaybeStartReportingLocked();
1582   }();
1583   grpc_slice_unref_internal(response_slice);
1584   if (xdslb_policy->shutting_down_) {
1585     lrs_calld->Unref(DEBUG_LOCATION,
1586                      "LRS+OnResponseReceivedLocked+xds_shutdown");
1587     return;
1588   }
1589   // Keep listening for LRS config updates.
1590   grpc_op op;
1591   memset(&op, 0, sizeof(op));
1592   op.op = GRPC_OP_RECV_MESSAGE;
1593   op.data.recv_message.recv_message = &lrs_calld->recv_message_payload_;
1594   op.flags = 0;
1595   op.reserved = nullptr;
1596   GPR_ASSERT(lrs_calld->lb_call_ != nullptr);
1597   // Reuse the "OnResponseReceivedLocked" ref taken in ctor.
1598   const grpc_call_error call_error = grpc_call_start_batch_and_execute(
1599       lrs_calld->lb_call_, &op, 1, &lrs_calld->on_response_received_);
1600   GPR_ASSERT(GRPC_CALL_OK == call_error);
1601 }
1602
1603 void XdsLb::LbChannelState::LrsCallState::OnStatusReceivedLocked(
1604     void* arg, grpc_error* error) {
1605   LrsCallState* lrs_calld = static_cast<LrsCallState*>(arg);
1606   XdsLb* xdslb_policy = lrs_calld->xdslb_policy();
1607   LbChannelState* lb_chand = lrs_calld->lb_chand();
1608   GPR_ASSERT(lrs_calld->lb_call_ != nullptr);
1609   if (GRPC_TRACE_FLAG_ENABLED(grpc_lb_xds_trace)) {
1610     char* status_details = grpc_slice_to_c_string(lrs_calld->status_details_);
1611     gpr_log(GPR_INFO,
1612             "[xdslb %p] LRS call status received. Status = %d, details "
1613             "= '%s', (lb_chand: %p, lb_calld: %p, lb_call: %p), error '%s'",
1614             xdslb_policy, lrs_calld->status_code_, status_details, lb_chand,
1615             lrs_calld, lrs_calld->lb_call_, grpc_error_string(error));
1616     gpr_free(status_details);
1617   }
1618   // Ignore status from a stale call.
1619   if (lrs_calld->IsCurrentCallOnChannel()) {
1620     // Because this call is the current one on the channel, the channel can't
1621     // have been swapped out; otherwise, the call should have been reset.
1622     GPR_ASSERT(lb_chand->IsCurrentChannel() || lb_chand->IsPendingChannel());
1623     GPR_ASSERT(!xdslb_policy->shutting_down_);
1624     if (lb_chand == xdslb_policy->LatestLbChannel()) {
1625       // This channel is the most recently created one. Try to restart the call
1626       // and reresolve.
1627       lrs_calld->parent_->OnCallFinishedLocked();
1628       xdslb_policy->channel_control_helper()->RequestReresolution();
1629     }
1630   }
1631   lrs_calld->Unref(DEBUG_LOCATION, "LRS+OnStatusReceivedLocked");
1632 }
1633
1634 bool XdsLb::LbChannelState::LrsCallState::IsCurrentCallOnChannel() const {
1635   // If the retryable LRS call is null (which only happens when the LB channel
1636   // is shutting down), all the LRS calls are stale.
1637   if (lb_chand()->lrs_calld_ == nullptr) return false;
1638   return this == lb_chand()->lrs_calld_->lb_calld();
1639 }
1640
1641 //
1642 // helper code for creating balancer channel
1643 //
1644
1645 // Returns the channel args for the LB channel, used to create a bidirectional
1646 // stream for the reception of load balancing updates.
1647 grpc_channel_args* BuildBalancerChannelArgs(const grpc_channel_args* args) {
1648   static const char* args_to_remove[] = {
1649       // LB policy name, since we want to use the default (pick_first) in
1650       // the LB channel.
1651       GRPC_ARG_LB_POLICY_NAME,
1652       // The service config that contains the LB config. We don't want to
1653       // recursively use xds in the LB channel.
1654       GRPC_ARG_SERVICE_CONFIG,
1655       // The channel arg for the server URI, since that will be different for
1656       // the LB channel than for the parent channel.  The client channel
1657       // factory will re-add this arg with the right value.
1658       GRPC_ARG_SERVER_URI,
1659       // The LB channel should use the authority indicated by the target
1660       // authority table (see \a ModifyXdsBalancerChannelArgs),
1661       // as opposed to the authority from the parent channel.
1662       GRPC_ARG_DEFAULT_AUTHORITY,
1663       // Just as for \a GRPC_ARG_DEFAULT_AUTHORITY, the LB channel should be
1664       // treated as a stand-alone channel and not inherit this argument from the
1665       // args of the parent channel.
1666       GRPC_SSL_TARGET_NAME_OVERRIDE_ARG,
1667       // Don't want to pass down channelz node from parent; the balancer
1668       // channel will get its own.
1669       GRPC_ARG_CHANNELZ_CHANNEL_NODE,
1670   };
1671   // Channel args to add.
1672   InlinedVector<grpc_arg, 2> args_to_add;
1673   // A channel arg indicating the target is a xds load balancer.
1674   args_to_add.emplace_back(grpc_channel_arg_integer_create(
1675       const_cast<char*>(GRPC_ARG_ADDRESS_IS_XDS_LOAD_BALANCER), 1));
1676   // The parent channel's channelz uuid.
1677   channelz::ChannelNode* channelz_node = nullptr;
1678   const grpc_arg* arg =
1679       grpc_channel_args_find(args, GRPC_ARG_CHANNELZ_CHANNEL_NODE);
1680   if (arg != nullptr && arg->type == GRPC_ARG_POINTER &&
1681       arg->value.pointer.p != nullptr) {
1682     channelz_node = static_cast<channelz::ChannelNode*>(arg->value.pointer.p);
1683     args_to_add.emplace_back(
1684         channelz::MakeParentUuidArg(channelz_node->uuid()));
1685   }
1686   // Construct channel args.
1687   grpc_channel_args* new_args = grpc_channel_args_copy_and_add_and_remove(
1688       args, args_to_remove, GPR_ARRAY_SIZE(args_to_remove), args_to_add.data(),
1689       args_to_add.size());
1690   // Make any necessary modifications for security.
1691   return ModifyXdsBalancerChannelArgs(new_args);
1692 }
1693
1694 //
1695 // ctor and dtor
1696 //
1697
1698 XdsLb::XdsLb(Args args)
1699     : LoadBalancingPolicy(std::move(args)),
1700       lb_call_timeout_ms_(grpc_channel_args_find_integer(
1701           args.args, GRPC_ARG_GRPCLB_CALL_TIMEOUT_MS, {0, 0, INT_MAX})),
1702       lb_fallback_timeout_ms_(grpc_channel_args_find_integer(
1703           args.args, GRPC_ARG_XDS_FALLBACK_TIMEOUT_MS,
1704           {GRPC_XDS_DEFAULT_FALLBACK_TIMEOUT_MS, 0, INT_MAX})),
1705       locality_retention_interval_ms_(grpc_channel_args_find_integer(
1706           args.args, GRPC_ARG_LOCALITY_RETENTION_INTERVAL_MS,
1707           {GRPC_XDS_DEFAULT_LOCALITY_RETENTION_INTERVAL_MS, 0, INT_MAX})),
1708       locality_map_(this) {
1709   // Record server name.
1710   const grpc_arg* arg = grpc_channel_args_find(args.args, GRPC_ARG_SERVER_URI);
1711   const char* server_uri = grpc_channel_arg_get_string(arg);
1712   GPR_ASSERT(server_uri != nullptr);
1713   grpc_uri* uri = grpc_uri_parse(server_uri, true);
1714   GPR_ASSERT(uri->path[0] != '\0');
1715   server_name_ = gpr_strdup(uri->path[0] == '/' ? uri->path + 1 : uri->path);
1716   if (GRPC_TRACE_FLAG_ENABLED(grpc_lb_xds_trace)) {
1717     gpr_log(GPR_INFO,
1718             "[xdslb %p] Will use '%s' as the server name for LB request.", this,
1719             server_name_);
1720   }
1721   grpc_uri_destroy(uri);
1722 }
1723
1724 XdsLb::~XdsLb() {
1725   if (GRPC_TRACE_FLAG_ENABLED(grpc_lb_xds_trace)) {
1726     gpr_log(GPR_INFO, "[xdslb %p] destroying xds LB policy", this);
1727   }
1728   gpr_free((void*)server_name_);
1729   grpc_channel_args_destroy(args_);
1730   locality_list_.clear();
1731 }
1732
1733 void XdsLb::ShutdownLocked() {
1734   if (GRPC_TRACE_FLAG_ENABLED(grpc_lb_xds_trace)) {
1735     gpr_log(GPR_INFO, "[xdslb %p] shutting down", this);
1736   }
1737   shutting_down_ = true;
1738   if (fallback_at_startup_checks_pending_) {
1739     grpc_timer_cancel(&lb_fallback_timer_);
1740   }
1741   locality_map_.ShutdownLocked();
1742   if (fallback_policy_ != nullptr) {
1743     grpc_pollset_set_del_pollset_set(fallback_policy_->interested_parties(),
1744                                      interested_parties());
1745   }
1746   if (pending_fallback_policy_ != nullptr) {
1747     grpc_pollset_set_del_pollset_set(
1748         pending_fallback_policy_->interested_parties(), interested_parties());
1749   }
1750   fallback_policy_.reset();
1751   pending_fallback_policy_.reset();
1752   // We reset the LB channels here instead of in our destructor because they
1753   // hold refs to XdsLb.
1754   lb_chand_.reset();
1755   pending_lb_chand_.reset();
1756 }
1757
1758 //
1759 // public methods
1760 //
1761
1762 void XdsLb::ResetBackoffLocked() {
1763   if (lb_chand_ != nullptr) {
1764     grpc_channel_reset_connect_backoff(lb_chand_->channel());
1765   }
1766   if (pending_lb_chand_ != nullptr) {
1767     grpc_channel_reset_connect_backoff(pending_lb_chand_->channel());
1768   }
1769   locality_map_.ResetBackoffLocked();
1770   if (fallback_policy_ != nullptr) {
1771     fallback_policy_->ResetBackoffLocked();
1772   }
1773   if (pending_fallback_policy_ != nullptr) {
1774     pending_fallback_policy_->ResetBackoffLocked();
1775   }
1776 }
1777
1778 void XdsLb::ProcessAddressesAndChannelArgsLocked(
1779     ServerAddressList addresses, const grpc_channel_args& args) {
1780   // Update fallback address list.
1781   fallback_backend_addresses_ = std::move(addresses);
1782   // Make sure that GRPC_ARG_LB_POLICY_NAME is set in channel args,
1783   // since we use this to trigger the client_load_reporting filter.
1784   static const char* args_to_remove[] = {GRPC_ARG_LB_POLICY_NAME};
1785   grpc_arg new_arg = grpc_channel_arg_string_create(
1786       (char*)GRPC_ARG_LB_POLICY_NAME, (char*)"xds");
1787   grpc_channel_args_destroy(args_);
1788   args_ = grpc_channel_args_copy_and_add_and_remove(
1789       &args, args_to_remove, GPR_ARRAY_SIZE(args_to_remove), &new_arg, 1);
1790   // Construct args for balancer channel.
1791   grpc_channel_args* lb_channel_args = BuildBalancerChannelArgs(&args);
1792   // Create an LB channel if we don't have one yet or the balancer name has
1793   // changed from the last received one.
1794   bool create_lb_channel = lb_chand_ == nullptr;
1795   if (lb_chand_ != nullptr) {
1796     UniquePtr<char> last_balancer_name(
1797         grpc_channel_get_target(LatestLbChannel()->channel()));
1798     create_lb_channel =
1799         strcmp(last_balancer_name.get(), balancer_name_.get()) != 0;
1800   }
1801   if (create_lb_channel) {
1802     OrphanablePtr<LbChannelState> lb_chand = MakeOrphanable<LbChannelState>(
1803         Ref(DEBUG_LOCATION, "XdsLb+LbChannelState"), balancer_name_.get(),
1804         *lb_channel_args);
1805     if (lb_chand_ == nullptr || !lb_chand_->HasActiveEdsCall()) {
1806       GPR_ASSERT(pending_lb_chand_ == nullptr);
1807       // If we do not have a working LB channel yet, use the newly created one.
1808       lb_chand_ = std::move(lb_chand);
1809     } else {
1810       // Otherwise, wait until the new LB channel to be ready to swap it in.
1811       pending_lb_chand_ = std::move(lb_chand);
1812     }
1813   }
1814   grpc_channel_args_destroy(lb_channel_args);
1815 }
1816
1817 void XdsLb::ParseLbConfig(const ParsedXdsConfig* xds_config) {
1818   if (xds_config == nullptr || xds_config->balancer_name() == nullptr) return;
1819   // TODO(yashykt) : does this need to be a gpr_strdup
1820   // TODO(juanlishen): Read balancer name from bootstrap file.
1821   balancer_name_ = UniquePtr<char>(gpr_strdup(xds_config->balancer_name()));
1822   child_policy_config_ = xds_config->child_policy();
1823   fallback_policy_config_ = xds_config->fallback_policy();
1824 }
1825
1826 void XdsLb::UpdateLocked(UpdateArgs args) {
1827   const bool is_initial_update = lb_chand_ == nullptr;
1828   ParseLbConfig(static_cast<const ParsedXdsConfig*>(args.config.get()));
1829   if (balancer_name_ == nullptr) {
1830     gpr_log(GPR_ERROR, "[xdslb %p] LB config parsing fails.", this);
1831     return;
1832   }
1833   ProcessAddressesAndChannelArgsLocked(std::move(args.addresses), *args.args);
1834   locality_map_.UpdateLocked(locality_list_, child_policy_config_.get(), args_,
1835                              this, is_initial_update);
1836   // Update the existing fallback policy. The fallback policy config and/or the
1837   // fallback addresses may be new.
1838   if (fallback_policy_ != nullptr) UpdateFallbackPolicyLocked();
1839   // If this is the initial update, start the fallback-at-startup checks.
1840   if (is_initial_update) {
1841     grpc_millis deadline = ExecCtx::Get()->Now() + lb_fallback_timeout_ms_;
1842     Ref(DEBUG_LOCATION, "on_fallback_timer").release();  // Held by closure
1843     GRPC_CLOSURE_INIT(&lb_on_fallback_, &XdsLb::OnFallbackTimerLocked, this,
1844                       grpc_combiner_scheduler(combiner()));
1845     fallback_at_startup_checks_pending_ = true;
1846     grpc_timer_init(&lb_fallback_timer_, deadline, &lb_on_fallback_);
1847     // Start watching the channel's connectivity state.  If the channel
1848     // goes into state TRANSIENT_FAILURE, we go into fallback mode even if
1849     // the fallback timeout has not elapsed.
1850     lb_chand_->StartConnectivityWatchLocked();
1851   }
1852 }
1853
1854 //
1855 // fallback-related methods
1856 //
1857
1858 void XdsLb::MaybeCancelFallbackAtStartupChecks() {
1859   if (!fallback_at_startup_checks_pending_) return;
1860   gpr_log(GPR_INFO,
1861           "[xdslb %p] Cancelling fallback timer and LB channel connectivity "
1862           "watch",
1863           this);
1864   grpc_timer_cancel(&lb_fallback_timer_);
1865   lb_chand_->CancelConnectivityWatchLocked();
1866   fallback_at_startup_checks_pending_ = false;
1867 }
1868
1869 void XdsLb::OnFallbackTimerLocked(void* arg, grpc_error* error) {
1870   XdsLb* xdslb_policy = static_cast<XdsLb*>(arg);
1871   // If some fallback-at-startup check is done after the timer fires but before
1872   // this callback actually runs, don't fall back.
1873   if (xdslb_policy->fallback_at_startup_checks_pending_ &&
1874       !xdslb_policy->shutting_down_ && error == GRPC_ERROR_NONE) {
1875     if (GRPC_TRACE_FLAG_ENABLED(grpc_lb_xds_trace)) {
1876       gpr_log(GPR_INFO,
1877               "[xdslb %p] Child policy not ready after fallback timeout; "
1878               "entering fallback mode",
1879               xdslb_policy);
1880     }
1881     xdslb_policy->fallback_at_startup_checks_pending_ = false;
1882     xdslb_policy->UpdateFallbackPolicyLocked();
1883     xdslb_policy->lb_chand_->CancelConnectivityWatchLocked();
1884   }
1885   xdslb_policy->Unref(DEBUG_LOCATION, "on_fallback_timer");
1886 }
1887
1888 void XdsLb::UpdateFallbackPolicyLocked() {
1889   if (shutting_down_) return;
1890   // Construct update args.
1891   UpdateArgs update_args;
1892   update_args.addresses = fallback_backend_addresses_;
1893   update_args.config = fallback_policy_config_ == nullptr
1894                            ? nullptr
1895                            : fallback_policy_config_->Ref();
1896   update_args.args = grpc_channel_args_copy(args_);
1897   // If the child policy name changes, we need to create a new child
1898   // policy.  When this happens, we leave child_policy_ as-is and store
1899   // the new child policy in pending_child_policy_.  Once the new child
1900   // policy transitions into state READY, we swap it into child_policy_,
1901   // replacing the original child policy.  So pending_child_policy_ is
1902   // non-null only between when we apply an update that changes the child
1903   // policy name and when the new child reports state READY.
1904   //
1905   // Updates can arrive at any point during this transition.  We always
1906   // apply updates relative to the most recently created child policy,
1907   // even if the most recent one is still in pending_child_policy_.  This
1908   // is true both when applying the updates to an existing child policy
1909   // and when determining whether we need to create a new policy.
1910   //
1911   // As a result of this, there are several cases to consider here:
1912   //
1913   // 1. We have no existing child policy (i.e., we have started up but
1914   //    have not yet received a serverlist from the balancer or gone
1915   //    into fallback mode; in this case, both child_policy_ and
1916   //    pending_child_policy_ are null).  In this case, we create a
1917   //    new child policy and store it in child_policy_.
1918   //
1919   // 2. We have an existing child policy and have no pending child policy
1920   //    from a previous update (i.e., either there has not been a
1921   //    previous update that changed the policy name, or we have already
1922   //    finished swapping in the new policy; in this case, child_policy_
1923   //    is non-null but pending_child_policy_ is null).  In this case:
1924   //    a. If child_policy_->name() equals child_policy_name, then we
1925   //       update the existing child policy.
1926   //    b. If child_policy_->name() does not equal child_policy_name,
1927   //       we create a new policy.  The policy will be stored in
1928   //       pending_child_policy_ and will later be swapped into
1929   //       child_policy_ by the helper when the new child transitions
1930   //       into state READY.
1931   //
1932   // 3. We have an existing child policy and have a pending child policy
1933   //    from a previous update (i.e., a previous update set
1934   //    pending_child_policy_ as per case 2b above and that policy has
1935   //    not yet transitioned into state READY and been swapped into
1936   //    child_policy_; in this case, both child_policy_ and
1937   //    pending_child_policy_ are non-null).  In this case:
1938   //    a. If pending_child_policy_->name() equals child_policy_name,
1939   //       then we update the existing pending child policy.
1940   //    b. If pending_child_policy->name() does not equal
1941   //       child_policy_name, then we create a new policy.  The new
1942   //       policy is stored in pending_child_policy_ (replacing the one
1943   //       that was there before, which will be immediately shut down)
1944   //       and will later be swapped into child_policy_ by the helper
1945   //       when the new child transitions into state READY.
1946   const char* fallback_policy_name = fallback_policy_config_ == nullptr
1947                                          ? "round_robin"
1948                                          : fallback_policy_config_->name();
1949   const bool create_policy =
1950       // case 1
1951       fallback_policy_ == nullptr ||
1952       // case 2b
1953       (pending_fallback_policy_ == nullptr &&
1954        strcmp(fallback_policy_->name(), fallback_policy_name) != 0) ||
1955       // case 3b
1956       (pending_fallback_policy_ != nullptr &&
1957        strcmp(pending_fallback_policy_->name(), fallback_policy_name) != 0);
1958   LoadBalancingPolicy* policy_to_update = nullptr;
1959   if (create_policy) {
1960     // Cases 1, 2b, and 3b: create a new child policy.
1961     // If child_policy_ is null, we set it (case 1), else we set
1962     // pending_child_policy_ (cases 2b and 3b).
1963     if (GRPC_TRACE_FLAG_ENABLED(grpc_lb_xds_trace)) {
1964       gpr_log(GPR_INFO, "[xdslb %p] Creating new %sfallback policy %s", this,
1965               fallback_policy_ == nullptr ? "" : "pending ",
1966               fallback_policy_name);
1967     }
1968     auto& lb_policy = fallback_policy_ == nullptr ? fallback_policy_
1969                                                   : pending_fallback_policy_;
1970     lb_policy =
1971         CreateFallbackPolicyLocked(fallback_policy_name, update_args.args);
1972     policy_to_update = lb_policy.get();
1973   } else {
1974     // Cases 2a and 3a: update an existing policy.
1975     // If we have a pending child policy, send the update to the pending
1976     // policy (case 3a), else send it to the current policy (case 2a).
1977     policy_to_update = pending_fallback_policy_ != nullptr
1978                            ? pending_fallback_policy_.get()
1979                            : fallback_policy_.get();
1980   }
1981   GPR_ASSERT(policy_to_update != nullptr);
1982   // Update the policy.
1983   if (GRPC_TRACE_FLAG_ENABLED(grpc_lb_xds_trace)) {
1984     gpr_log(
1985         GPR_INFO, "[xdslb %p] Updating %sfallback policy %p", this,
1986         policy_to_update == pending_fallback_policy_.get() ? "pending " : "",
1987         policy_to_update);
1988   }
1989   policy_to_update->UpdateLocked(std::move(update_args));
1990 }
1991
1992 OrphanablePtr<LoadBalancingPolicy> XdsLb::CreateFallbackPolicyLocked(
1993     const char* name, const grpc_channel_args* args) {
1994   FallbackHelper* helper =
1995       New<FallbackHelper>(Ref(DEBUG_LOCATION, "FallbackHelper"));
1996   LoadBalancingPolicy::Args lb_policy_args;
1997   lb_policy_args.combiner = combiner();
1998   lb_policy_args.args = args;
1999   lb_policy_args.channel_control_helper =
2000       UniquePtr<ChannelControlHelper>(helper);
2001   OrphanablePtr<LoadBalancingPolicy> lb_policy =
2002       LoadBalancingPolicyRegistry::CreateLoadBalancingPolicy(
2003           name, std::move(lb_policy_args));
2004   if (GPR_UNLIKELY(lb_policy == nullptr)) {
2005     gpr_log(GPR_ERROR, "[xdslb %p] Failure creating fallback policy %s", this,
2006             name);
2007     return nullptr;
2008   }
2009   helper->set_child(lb_policy.get());
2010   if (GRPC_TRACE_FLAG_ENABLED(grpc_lb_xds_trace)) {
2011     gpr_log(GPR_INFO, "[xdslb %p] Created new fallback policy %s (%p)", this,
2012             name, lb_policy.get());
2013   }
2014   // Add the xDS's interested_parties pollset_set to that of the newly created
2015   // child policy. This will make the child policy progress upon activity on xDS
2016   // LB, which in turn is tied to the application's call.
2017   grpc_pollset_set_add_pollset_set(lb_policy->interested_parties(),
2018                                    interested_parties());
2019   return lb_policy;
2020 }
2021
2022 void XdsLb::MaybeExitFallbackMode() {
2023   if (fallback_policy_ == nullptr) return;
2024   gpr_log(GPR_INFO, "[xdslb %p] Exiting fallback mode", this);
2025   fallback_policy_.reset();
2026   pending_fallback_policy_.reset();
2027 }
2028
2029 //
2030 // XdsLb::LocalityMap
2031 //
2032
2033 void XdsLb::LocalityMap::UpdateLocked(
2034     const XdsLocalityList& locality_list,
2035     LoadBalancingPolicy::Config* child_policy_config,
2036     const grpc_channel_args* args, XdsLb* parent, bool is_initial_update) {
2037   if (parent->shutting_down_) return;
2038   // Add or update the localities in locality_list.
2039   for (size_t i = 0; i < locality_list.size(); i++) {
2040     auto& locality_name = locality_list[i].locality_name;
2041     auto iter = map_.find(locality_name);
2042     // Add a new entry in the locality map if a new locality is received in the
2043     // locality list.
2044     if (iter == map_.end()) {
2045       OrphanablePtr<LocalityEntry> new_entry = MakeOrphanable<LocalityEntry>(
2046           parent->Ref(DEBUG_LOCATION, "LocalityEntry"), locality_name);
2047       iter = map_.emplace(locality_name, std::move(new_entry)).first;
2048     }
2049     // Keep a copy of serverlist in locality_list_ so that we can compare it
2050     // with the future ones.
2051     iter->second->UpdateLocked(locality_list[i].lb_weight,
2052                                locality_list[i].serverlist, child_policy_config,
2053                                args);
2054   }
2055   // Remove (later) the localities not in locality_list.
2056   for (auto& p : map_) {
2057     const XdsLocalityName* locality_name = p.first.get();
2058     LocalityEntry* locality_entry = p.second.get();
2059     bool in_locality_list = false;
2060     for (size_t i = 0; i < locality_list.size(); ++i) {
2061       if (*locality_list[i].locality_name == *locality_name) {
2062         in_locality_list = true;
2063         break;
2064       }
2065     }
2066     if (!in_locality_list) locality_entry->DeactivateLocked();
2067   }
2068   // Generate a new xds picker immediately.
2069   if (!is_initial_update) UpdateXdsPickerLocked();
2070 }
2071
2072 void XdsLb::LocalityMap::UpdateXdsPickerLocked() {
2073   // If we are in fallback mode, don't generate an xds picker from localities.
2074   if (xds_policy_->fallback_policy_ != nullptr) return;
2075   // Construct a new xds picker which maintains a map of all locality pickers
2076   // that are ready. Each locality is represented by a portion of the range
2077   // proportional to its weight, such that the total range is the sum of the
2078   // weights of all localities.
2079   uint32_t end = 0;
2080   size_t num_connecting = 0;
2081   size_t num_idle = 0;
2082   size_t num_transient_failures = 0;
2083   Picker::PickerList pickers;
2084   for (auto& p : map_) {
2085     const LocalityEntry* entry = p.second.get();
2086     if (entry->locality_weight() == 0) continue;
2087     switch (entry->connectivity_state()) {
2088       case GRPC_CHANNEL_READY: {
2089         end += entry->locality_weight();
2090         pickers.push_back(MakePair(end, entry->picker_wrapper()));
2091         break;
2092       }
2093       case GRPC_CHANNEL_CONNECTING: {
2094         num_connecting++;
2095         break;
2096       }
2097       case GRPC_CHANNEL_IDLE: {
2098         num_idle++;
2099         break;
2100       }
2101       case GRPC_CHANNEL_TRANSIENT_FAILURE: {
2102         num_transient_failures++;
2103         break;
2104       }
2105       default:
2106         GPR_UNREACHABLE_CODE(return );
2107     }
2108   }
2109   // Pass on the constructed xds picker if it has any ready pickers in their map
2110   // otherwise pass a QueuePicker if any of the locality pickers are in a
2111   // connecting or idle state, finally return a transient failure picker if all
2112   // locality pickers are in transient failure.
2113   if (!pickers.empty()) {
2114     xds_policy_->channel_control_helper()->UpdateState(
2115         GRPC_CHANNEL_READY,
2116         UniquePtr<LoadBalancingPolicy::SubchannelPicker>(
2117             New<Picker>(xds_policy_->Ref(DEBUG_LOCATION, "XdsLb+Picker"),
2118                         std::move(pickers))));
2119   } else if (num_connecting > 0) {
2120     xds_policy_->channel_control_helper()->UpdateState(
2121         GRPC_CHANNEL_CONNECTING,
2122         UniquePtr<SubchannelPicker>(
2123             New<QueuePicker>(xds_policy_->Ref(DEBUG_LOCATION, "QueuePicker"))));
2124   } else if (num_idle > 0) {
2125     xds_policy_->channel_control_helper()->UpdateState(
2126         GRPC_CHANNEL_IDLE,
2127         UniquePtr<SubchannelPicker>(
2128             New<QueuePicker>(xds_policy_->Ref(DEBUG_LOCATION, "QueuePicker"))));
2129   } else {
2130     grpc_error* error =
2131         grpc_error_set_int(GRPC_ERROR_CREATE_FROM_STATIC_STRING(
2132                                "connections to all active localities failing"),
2133                            GRPC_ERROR_INT_GRPC_STATUS, GRPC_STATUS_UNAVAILABLE);
2134     xds_policy_->channel_control_helper()->UpdateState(
2135         GRPC_CHANNEL_TRANSIENT_FAILURE,
2136         UniquePtr<SubchannelPicker>(New<TransientFailurePicker>(error)));
2137   }
2138 }
2139
2140 void XdsLb::LocalityMap::ShutdownLocked() { map_.clear(); }
2141
2142 void XdsLb::LocalityMap::ResetBackoffLocked() {
2143   for (auto& p : map_) {
2144     p.second->ResetBackoffLocked();
2145   }
2146 }
2147
2148 //
2149 // XdsLb::LocalityMap::LocalityEntry
2150 //
2151
2152 XdsLb::LocalityMap::LocalityEntry::LocalityEntry(
2153     RefCountedPtr<XdsLb> parent, RefCountedPtr<XdsLocalityName> name)
2154     : parent_(std::move(parent)), name_(std::move(name)) {
2155   if (GRPC_TRACE_FLAG_ENABLED(grpc_lb_xds_trace)) {
2156     gpr_log(GPR_INFO, "[xdslb %p] created LocalityEntry %p for %s",
2157             parent_.get(), this, name_->AsHumanReadableString());
2158   }
2159   GRPC_CLOSURE_INIT(&on_delayed_removal_timer_, OnDelayedRemovalTimerLocked,
2160                     this, grpc_combiner_scheduler(parent_->combiner()));
2161 }
2162
2163 XdsLb::LocalityMap::LocalityEntry::~LocalityEntry() {
2164   if (GRPC_TRACE_FLAG_ENABLED(grpc_lb_xds_trace)) {
2165     gpr_log(GPR_INFO,
2166             "[xdslb %p] LocalityEntry %p %s: destroying locality entry",
2167             parent_.get(), this, name_->AsHumanReadableString());
2168   }
2169   parent_.reset(DEBUG_LOCATION, "LocalityEntry");
2170 }
2171
2172 grpc_channel_args*
2173 XdsLb::LocalityMap::LocalityEntry::CreateChildPolicyArgsLocked(
2174     const grpc_channel_args* args_in) {
2175   const grpc_arg args_to_add[] = {
2176       // A channel arg indicating if the target is a backend inferred from a
2177       // grpclb load balancer.
2178       grpc_channel_arg_integer_create(
2179           const_cast<char*>(GRPC_ARG_ADDRESS_IS_BACKEND_FROM_XDS_LOAD_BALANCER),
2180           1),
2181       // Inhibit client-side health checking, since the balancer does
2182       // this for us.
2183       grpc_channel_arg_integer_create(
2184           const_cast<char*>(GRPC_ARG_INHIBIT_HEALTH_CHECKING), 1),
2185   };
2186   return grpc_channel_args_copy_and_add(args_in, args_to_add,
2187                                         GPR_ARRAY_SIZE(args_to_add));
2188 }
2189
2190 OrphanablePtr<LoadBalancingPolicy>
2191 XdsLb::LocalityMap::LocalityEntry::CreateChildPolicyLocked(
2192     const char* name, const grpc_channel_args* args) {
2193   Helper* helper = New<Helper>(this->Ref(DEBUG_LOCATION, "Helper"));
2194   LoadBalancingPolicy::Args lb_policy_args;
2195   lb_policy_args.combiner = parent_->combiner();
2196   lb_policy_args.args = args;
2197   lb_policy_args.channel_control_helper =
2198       UniquePtr<ChannelControlHelper>(helper);
2199   OrphanablePtr<LoadBalancingPolicy> lb_policy =
2200       LoadBalancingPolicyRegistry::CreateLoadBalancingPolicy(
2201           name, std::move(lb_policy_args));
2202   if (GPR_UNLIKELY(lb_policy == nullptr)) {
2203     gpr_log(GPR_ERROR,
2204             "[xdslb %p] LocalityEntry %p %s: failure creating child policy %s",
2205             parent_.get(), this, name_->AsHumanReadableString(), name);
2206     return nullptr;
2207   }
2208   helper->set_child(lb_policy.get());
2209   if (GRPC_TRACE_FLAG_ENABLED(grpc_lb_xds_trace)) {
2210     gpr_log(GPR_INFO,
2211             "[xdslb %p] LocalityEntry %p %s: Created new child policy %s (%p)",
2212             parent_.get(), this, name_->AsHumanReadableString(), name,
2213             lb_policy.get());
2214   }
2215   // Add the xDS's interested_parties pollset_set to that of the newly created
2216   // child policy. This will make the child policy progress upon activity on xDS
2217   // LB, which in turn is tied to the application's call.
2218   grpc_pollset_set_add_pollset_set(lb_policy->interested_parties(),
2219                                    parent_->interested_parties());
2220   return lb_policy;
2221 }
2222
2223 void XdsLb::LocalityMap::LocalityEntry::UpdateLocked(
2224     uint32_t locality_weight, ServerAddressList serverlist,
2225     LoadBalancingPolicy::Config* child_policy_config,
2226     const grpc_channel_args* args_in) {
2227   if (parent_->shutting_down_) return;
2228   // Update locality weight.
2229   locality_weight_ = locality_weight;
2230   if (delayed_removal_timer_callback_pending_) {
2231     grpc_timer_cancel(&delayed_removal_timer_);
2232   }
2233   // Construct update args.
2234   UpdateArgs update_args;
2235   update_args.addresses = std::move(serverlist);
2236   update_args.config =
2237       child_policy_config == nullptr ? nullptr : child_policy_config->Ref();
2238   update_args.args = CreateChildPolicyArgsLocked(args_in);
2239   // If the child policy name changes, we need to create a new child
2240   // policy.  When this happens, we leave child_policy_ as-is and store
2241   // the new child policy in pending_child_policy_.  Once the new child
2242   // policy transitions into state READY, we swap it into child_policy_,
2243   // replacing the original child policy.  So pending_child_policy_ is
2244   // non-null only between when we apply an update that changes the child
2245   // policy name and when the new child reports state READY.
2246   //
2247   // Updates can arrive at any point during this transition.  We always
2248   // apply updates relative to the most recently created child policy,
2249   // even if the most recent one is still in pending_child_policy_.  This
2250   // is true both when applying the updates to an existing child policy
2251   // and when determining whether we need to create a new policy.
2252   //
2253   // As a result of this, there are several cases to consider here:
2254   //
2255   // 1. We have no existing child policy (i.e., we have started up but
2256   //    have not yet received a serverlist from the balancer or gone
2257   //    into fallback mode; in this case, both child_policy_ and
2258   //    pending_child_policy_ are null).  In this case, we create a
2259   //    new child policy and store it in child_policy_.
2260   //
2261   // 2. We have an existing child policy and have no pending child policy
2262   //    from a previous update (i.e., either there has not been a
2263   //    previous update that changed the policy name, or we have already
2264   //    finished swapping in the new policy; in this case, child_policy_
2265   //    is non-null but pending_child_policy_ is null).  In this case:
2266   //    a. If child_policy_->name() equals child_policy_name, then we
2267   //       update the existing child policy.
2268   //    b. If child_policy_->name() does not equal child_policy_name,
2269   //       we create a new policy.  The policy will be stored in
2270   //       pending_child_policy_ and will later be swapped into
2271   //       child_policy_ by the helper when the new child transitions
2272   //       into state READY.
2273   //
2274   // 3. We have an existing child policy and have a pending child policy
2275   //    from a previous update (i.e., a previous update set
2276   //    pending_child_policy_ as per case 2b above and that policy has
2277   //    not yet transitioned into state READY and been swapped into
2278   //    child_policy_; in this case, both child_policy_ and
2279   //    pending_child_policy_ are non-null).  In this case:
2280   //    a. If pending_child_policy_->name() equals child_policy_name,
2281   //       then we update the existing pending child policy.
2282   //    b. If pending_child_policy->name() does not equal
2283   //       child_policy_name, then we create a new policy.  The new
2284   //       policy is stored in pending_child_policy_ (replacing the one
2285   //       that was there before, which will be immediately shut down)
2286   //       and will later be swapped into child_policy_ by the helper
2287   //       when the new child transitions into state READY.
2288   // TODO(juanlishen): If the child policy is not configured via service config,
2289   // use whatever algorithm is specified by the balancer.
2290   const char* child_policy_name = child_policy_config == nullptr
2291                                       ? "round_robin"
2292                                       : child_policy_config->name();
2293   const bool create_policy =
2294       // case 1
2295       child_policy_ == nullptr ||
2296       // case 2b
2297       (pending_child_policy_ == nullptr &&
2298        strcmp(child_policy_->name(), child_policy_name) != 0) ||
2299       // case 3b
2300       (pending_child_policy_ != nullptr &&
2301        strcmp(pending_child_policy_->name(), child_policy_name) != 0);
2302   LoadBalancingPolicy* policy_to_update = nullptr;
2303   if (create_policy) {
2304     // Cases 1, 2b, and 3b: create a new child policy.
2305     // If child_policy_ is null, we set it (case 1), else we set
2306     // pending_child_policy_ (cases 2b and 3b).
2307     if (GRPC_TRACE_FLAG_ENABLED(grpc_lb_xds_trace)) {
2308       gpr_log(GPR_INFO,
2309               "[xdslb %p] LocalityEntry %p %s: Creating new %schild policy %s",
2310               parent_.get(), this, name_->AsHumanReadableString(),
2311               child_policy_ == nullptr ? "" : "pending ", child_policy_name);
2312     }
2313     auto& lb_policy =
2314         child_policy_ == nullptr ? child_policy_ : pending_child_policy_;
2315     lb_policy = CreateChildPolicyLocked(child_policy_name, update_args.args);
2316     policy_to_update = lb_policy.get();
2317   } else {
2318     // Cases 2a and 3a: update an existing policy.
2319     // If we have a pending child policy, send the update to the pending
2320     // policy (case 3a), else send it to the current policy (case 2a).
2321     policy_to_update = pending_child_policy_ != nullptr
2322                            ? pending_child_policy_.get()
2323                            : child_policy_.get();
2324   }
2325   GPR_ASSERT(policy_to_update != nullptr);
2326   // Update the policy.
2327   if (GRPC_TRACE_FLAG_ENABLED(grpc_lb_xds_trace)) {
2328     gpr_log(GPR_INFO,
2329             "[xdslb %p] LocalityEntry %p %s: Updating %schild policy %p",
2330             parent_.get(), this, name_->AsHumanReadableString(),
2331             policy_to_update == pending_child_policy_.get() ? "pending " : "",
2332             policy_to_update);
2333   }
2334   policy_to_update->UpdateLocked(std::move(update_args));
2335 }
2336
2337 void XdsLb::LocalityMap::LocalityEntry::ShutdownLocked() {
2338   if (GRPC_TRACE_FLAG_ENABLED(grpc_lb_xds_trace)) {
2339     gpr_log(GPR_INFO,
2340             "[xdslb %p] LocalityEntry %p %s: shutting down locality entry",
2341             parent_.get(), this, name_->AsHumanReadableString());
2342   }
2343   // Remove the child policy's interested_parties pollset_set from the
2344   // xDS policy.
2345   grpc_pollset_set_del_pollset_set(child_policy_->interested_parties(),
2346                                    parent_->interested_parties());
2347   child_policy_.reset();
2348   if (pending_child_policy_ != nullptr) {
2349     grpc_pollset_set_del_pollset_set(
2350         pending_child_policy_->interested_parties(),
2351         parent_->interested_parties());
2352     pending_child_policy_.reset();
2353   }
2354   // Drop our ref to the child's picker, in case it's holding a ref to
2355   // the child.
2356   picker_wrapper_.reset();
2357   if (delayed_removal_timer_callback_pending_) {
2358     grpc_timer_cancel(&delayed_removal_timer_);
2359   }
2360 }
2361
2362 void XdsLb::LocalityMap::LocalityEntry::ResetBackoffLocked() {
2363   child_policy_->ResetBackoffLocked();
2364   if (pending_child_policy_ != nullptr) {
2365     pending_child_policy_->ResetBackoffLocked();
2366   }
2367 }
2368
2369 void XdsLb::LocalityMap::LocalityEntry::Orphan() {
2370   ShutdownLocked();
2371   Unref();
2372 }
2373
2374 void XdsLb::LocalityMap::LocalityEntry::DeactivateLocked() {
2375   // If locality retaining is disabled, delete the locality immediately.
2376   if (parent_->locality_retention_interval_ms_ == 0) {
2377     parent_->locality_map_.map_.erase(name_);
2378     return;
2379   }
2380   // If already deactivated, don't do that again.
2381   if (locality_weight_ == 0) return;
2382   // Set the locality weight to 0 so that future xds picker won't contain this
2383   // locality.
2384   locality_weight_ = 0;
2385   // Start a timer to delete the locality.
2386   Ref(DEBUG_LOCATION, "LocalityEntry+timer").release();
2387   grpc_timer_init(
2388       &delayed_removal_timer_,
2389       ExecCtx::Get()->Now() + parent_->locality_retention_interval_ms_,
2390       &on_delayed_removal_timer_);
2391   delayed_removal_timer_callback_pending_ = true;
2392 }
2393
2394 void XdsLb::LocalityMap::LocalityEntry::OnDelayedRemovalTimerLocked(
2395     void* arg, grpc_error* error) {
2396   LocalityEntry* self = static_cast<LocalityEntry*>(arg);
2397   self->delayed_removal_timer_callback_pending_ = false;
2398   if (error == GRPC_ERROR_NONE && self->locality_weight_ == 0) {
2399     self->parent_->locality_map_.map_.erase(self->name_);
2400   }
2401   self->Unref(DEBUG_LOCATION, "LocalityEntry+timer");
2402 }
2403
2404 //
2405 // XdsLb::LocalityEntry::Helper
2406 //
2407
2408 bool XdsLb::LocalityMap::LocalityEntry::Helper::CalledByPendingChild() const {
2409   GPR_ASSERT(child_ != nullptr);
2410   return child_ == entry_->pending_child_policy_.get();
2411 }
2412
2413 bool XdsLb::LocalityMap::LocalityEntry::Helper::CalledByCurrentChild() const {
2414   GPR_ASSERT(child_ != nullptr);
2415   return child_ == entry_->child_policy_.get();
2416 }
2417
2418 RefCountedPtr<SubchannelInterface>
2419 XdsLb::LocalityMap::LocalityEntry::Helper::CreateSubchannel(
2420     const grpc_channel_args& args) {
2421   if (entry_->parent_->shutting_down_ ||
2422       (!CalledByPendingChild() && !CalledByCurrentChild())) {
2423     return nullptr;
2424   }
2425   return entry_->parent_->channel_control_helper()->CreateSubchannel(args);
2426 }
2427
2428 void XdsLb::LocalityMap::LocalityEntry::Helper::UpdateState(
2429     grpc_connectivity_state state, UniquePtr<SubchannelPicker> picker) {
2430   if (entry_->parent_->shutting_down_) return;
2431   // If this request is from the pending child policy, ignore it until
2432   // it reports READY, at which point we swap it into place.
2433   if (CalledByPendingChild()) {
2434     if (GRPC_TRACE_FLAG_ENABLED(grpc_lb_xds_trace)) {
2435       gpr_log(GPR_INFO,
2436               "[xdslb %p helper %p] pending child policy %p reports state=%s",
2437               entry_->parent_.get(), this, entry_->pending_child_policy_.get(),
2438               grpc_connectivity_state_name(state));
2439     }
2440     if (state != GRPC_CHANNEL_READY) return;
2441     grpc_pollset_set_del_pollset_set(
2442         entry_->child_policy_->interested_parties(),
2443         entry_->parent_->interested_parties());
2444     entry_->child_policy_ = std::move(entry_->pending_child_policy_);
2445   } else if (!CalledByCurrentChild()) {
2446     // This request is from an outdated child, so ignore it.
2447     return;
2448   }
2449   // At this point, child_ must be the current child policy.
2450   if (state == GRPC_CHANNEL_READY) {
2451     entry_->parent_->MaybeCancelFallbackAtStartupChecks();
2452     entry_->parent_->MaybeExitFallbackMode();
2453   }
2454   GPR_ASSERT(entry_->parent_->lb_chand_ != nullptr);
2455   // Cache the picker and its state in the entry.
2456   entry_->picker_wrapper_ = MakeRefCounted<PickerWrapper>(
2457       std::move(picker),
2458       entry_->parent_->client_stats_.FindLocalityStats(entry_->name_));
2459   entry_->connectivity_state_ = state;
2460   // Construct a new xds picker and pass it to the channel.
2461   entry_->parent_->locality_map_.UpdateXdsPickerLocked();
2462 }
2463
2464 void XdsLb::LocalityMap::LocalityEntry::Helper::RequestReresolution() {
2465   if (entry_->parent_->shutting_down_) return;
2466   // If there is a pending child policy, ignore re-resolution requests
2467   // from the current child policy (or any outdated child).
2468   if (entry_->pending_child_policy_ != nullptr && !CalledByPendingChild()) {
2469     return;
2470   }
2471   if (GRPC_TRACE_FLAG_ENABLED(grpc_lb_xds_trace)) {
2472     gpr_log(GPR_INFO,
2473             "[xdslb %p] Re-resolution requested from the internal RR policy "
2474             "(%p).",
2475             entry_->parent_.get(), entry_->child_policy_.get());
2476   }
2477   GPR_ASSERT(entry_->parent_->lb_chand_ != nullptr);
2478   // If we are talking to a balancer, we expect to get updated addresses
2479   // from the balancer, so we can ignore the re-resolution request from
2480   // the child policy. Otherwise, pass the re-resolution request up to the
2481   // channel.
2482   if (entry_->parent_->lb_chand_->eds_calld() == nullptr ||
2483       !entry_->parent_->lb_chand_->eds_calld()->seen_response()) {
2484     entry_->parent_->channel_control_helper()->RequestReresolution();
2485   }
2486 }
2487
2488 void XdsLb::LocalityMap::LocalityEntry::Helper::AddTraceEvent(
2489     TraceSeverity severity, StringView message) {
2490   if (entry_->parent_->shutting_down_ ||
2491       (!CalledByPendingChild() && !CalledByCurrentChild())) {
2492     return;
2493   }
2494   entry_->parent_->channel_control_helper()->AddTraceEvent(severity, message);
2495 }
2496
2497 //
2498 // factory
2499 //
2500
2501 class XdsFactory : public LoadBalancingPolicyFactory {
2502  public:
2503   OrphanablePtr<LoadBalancingPolicy> CreateLoadBalancingPolicy(
2504       LoadBalancingPolicy::Args args) const override {
2505     return OrphanablePtr<LoadBalancingPolicy>(New<XdsLb>(std::move(args)));
2506   }
2507
2508   const char* name() const override { return kXds; }
2509
2510   RefCountedPtr<LoadBalancingPolicy::Config> ParseLoadBalancingConfig(
2511       const grpc_json* json, grpc_error** error) const override {
2512     GPR_DEBUG_ASSERT(error != nullptr && *error == GRPC_ERROR_NONE);
2513     if (json == nullptr) {
2514       // xds was mentioned as a policy in the deprecated loadBalancingPolicy
2515       // field or in the client API.
2516       *error = GRPC_ERROR_CREATE_FROM_STATIC_STRING(
2517           "field:loadBalancingPolicy error:Xds Parser has required field - "
2518           "balancerName. Please use loadBalancingConfig field of service "
2519           "config instead.");
2520       return nullptr;
2521     }
2522     GPR_DEBUG_ASSERT(strcmp(json->key, name()) == 0);
2523
2524     InlinedVector<grpc_error*, 3> error_list;
2525     const char* balancer_name = nullptr;
2526     RefCountedPtr<LoadBalancingPolicy::Config> child_policy;
2527     RefCountedPtr<LoadBalancingPolicy::Config> fallback_policy;
2528     for (const grpc_json* field = json->child; field != nullptr;
2529          field = field->next) {
2530       if (field->key == nullptr) continue;
2531       if (strcmp(field->key, "balancerName") == 0) {
2532         if (balancer_name != nullptr) {
2533           error_list.push_back(GRPC_ERROR_CREATE_FROM_STATIC_STRING(
2534               "field:balancerName error:Duplicate entry"));
2535         }
2536         if (field->type != GRPC_JSON_STRING) {
2537           error_list.push_back(GRPC_ERROR_CREATE_FROM_STATIC_STRING(
2538               "field:balancerName error:type should be string"));
2539           continue;
2540         }
2541         balancer_name = field->value;
2542       } else if (strcmp(field->key, "childPolicy") == 0) {
2543         if (child_policy != nullptr) {
2544           error_list.push_back(GRPC_ERROR_CREATE_FROM_STATIC_STRING(
2545               "field:childPolicy error:Duplicate entry"));
2546         }
2547         grpc_error* parse_error = GRPC_ERROR_NONE;
2548         child_policy = LoadBalancingPolicyRegistry::ParseLoadBalancingConfig(
2549             field, &parse_error);
2550         if (child_policy == nullptr) {
2551           GPR_DEBUG_ASSERT(parse_error != GRPC_ERROR_NONE);
2552           error_list.push_back(parse_error);
2553         }
2554       } else if (strcmp(field->key, "fallbackPolicy") == 0) {
2555         if (fallback_policy != nullptr) {
2556           error_list.push_back(GRPC_ERROR_CREATE_FROM_STATIC_STRING(
2557               "field:fallbackPolicy error:Duplicate entry"));
2558         }
2559         grpc_error* parse_error = GRPC_ERROR_NONE;
2560         fallback_policy = LoadBalancingPolicyRegistry::ParseLoadBalancingConfig(
2561             field, &parse_error);
2562         if (fallback_policy == nullptr) {
2563           GPR_DEBUG_ASSERT(parse_error != GRPC_ERROR_NONE);
2564           error_list.push_back(parse_error);
2565         }
2566       }
2567     }
2568     if (error_list.empty()) {
2569       return RefCountedPtr<LoadBalancingPolicy::Config>(New<ParsedXdsConfig>(
2570           balancer_name, std::move(child_policy), std::move(fallback_policy)));
2571     } else {
2572       *error = GRPC_ERROR_CREATE_FROM_VECTOR("Xds Parser", &error_list);
2573       return nullptr;
2574     }
2575   }
2576 };
2577
2578 }  // namespace
2579
2580 }  // namespace grpc_core
2581
2582 //
2583 // Plugin registration
2584 //
2585
2586 void grpc_lb_policy_xds_init() {
2587   grpc_core::LoadBalancingPolicyRegistry::Builder::
2588       RegisterLoadBalancingPolicyFactory(
2589           grpc_core::UniquePtr<grpc_core::LoadBalancingPolicyFactory>(
2590               grpc_core::New<grpc_core::XdsFactory>()));
2591 }
2592
2593 void grpc_lb_policy_xds_shutdown() {}