Built motion from commit 6a09e18b.|2.6.11
[motion2.git] / legacy-libs / grpc-cloned / deps / grpc / src / core / ext / filters / client_channel / lb_policy / grpclb / grpclb.cc
1 /*
2  *
3  * Copyright 2016 gRPC authors.
4  *
5  * Licensed under the Apache License, Version 2.0 (the "License");
6  * you may not use this file except in compliance with the License.
7  * You may obtain a copy of the License at
8  *
9  *     http://www.apache.org/licenses/LICENSE-2.0
10  *
11  * Unless required by applicable law or agreed to in writing, software
12  * distributed under the License is distributed on an "AS IS" BASIS,
13  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14  * See the License for the specific language governing permissions and
15  * limitations under the License.
16  *
17  */
18
19 /// Implementation of the gRPC LB policy.
20 ///
21 /// This policy takes as input a list of resolved addresses, which must
22 /// include at least one balancer address.
23 ///
24 /// An internal channel (\a lb_channel_) is created for the addresses
25 /// from that are balancers.  This channel behaves just like a regular
26 /// channel that uses pick_first to select from the list of balancer
27 /// addresses.
28 ///
29 /// When we get our initial update, we instantiate the internal *streaming*
30 /// call to the LB server (whichever address pick_first chose).  The call
31 /// will be complete when either the balancer sends status or when we cancel
32 /// the call (e.g., because we are shutting down).  In needed, we retry the
33 /// call.  If we received at least one valid message from the server, a new
34 /// call attempt will be made immediately; otherwise, we apply back-off
35 /// delays between attempts.
36 ///
37 /// We maintain an internal round_robin policy instance for distributing
38 /// requests across backends.  Whenever we receive a new serverlist from
39 /// the balancer, we update the round_robin policy with the new list of
40 /// addresses.  If we cannot communicate with the balancer on startup,
41 /// however, we may enter fallback mode, in which case we will populate
42 /// the child policy's addresses from the backend addresses returned by the
43 /// resolver.
44 ///
45 /// Once a child policy instance is in place (and getting updated as described),
46 /// calls for a pick, a ping, or a cancellation will be serviced right
47 /// away by forwarding them to the child policy instance.  Any time there's no
48 /// child policy available (i.e., right after the creation of the gRPCLB
49 /// policy), pick requests are queued.
50 ///
51 /// \see https://github.com/grpc/grpc/blob/master/doc/load-balancing.md for the
52 /// high level design and details.
53
54 // With the addition of a libuv endpoint, sockaddr.h now includes uv.h when
55 // using that endpoint. Because of various transitive includes in uv.h,
56 // including windows.h on Windows, uv.h must be included before other system
57 // headers. Therefore, sockaddr.h must always be included first.
58 #include <grpc/support/port_platform.h>
59
60 #include "src/core/lib/iomgr/sockaddr.h"
61 #include "src/core/lib/iomgr/socket_utils.h"
62
63 #include <inttypes.h>
64 #include <limits.h>
65 #include <string.h>
66
67 #include <grpc/byte_buffer_reader.h>
68 #include <grpc/grpc.h>
69 #include <grpc/support/alloc.h>
70 #include <grpc/support/string_util.h>
71 #include <grpc/support/time.h>
72
73 #include "src/core/ext/filters/client_channel/client_channel.h"
74 #include "src/core/ext/filters/client_channel/lb_policy/grpclb/client_load_reporting_filter.h"
75 #include "src/core/ext/filters/client_channel/lb_policy/grpclb/grpclb.h"
76 #include "src/core/ext/filters/client_channel/lb_policy/grpclb/grpclb_channel.h"
77 #include "src/core/ext/filters/client_channel/lb_policy/grpclb/grpclb_client_stats.h"
78 #include "src/core/ext/filters/client_channel/lb_policy/grpclb/load_balancer_api.h"
79 #include "src/core/ext/filters/client_channel/lb_policy_factory.h"
80 #include "src/core/ext/filters/client_channel/lb_policy_registry.h"
81 #include "src/core/ext/filters/client_channel/parse_address.h"
82 #include "src/core/ext/filters/client_channel/resolver/fake/fake_resolver.h"
83 #include "src/core/ext/filters/client_channel/server_address.h"
84 #include "src/core/lib/backoff/backoff.h"
85 #include "src/core/lib/channel/channel_args.h"
86 #include "src/core/lib/channel/channel_stack.h"
87 #include "src/core/lib/gpr/string.h"
88 #include "src/core/lib/gprpp/manual_constructor.h"
89 #include "src/core/lib/gprpp/memory.h"
90 #include "src/core/lib/gprpp/orphanable.h"
91 #include "src/core/lib/gprpp/ref_counted_ptr.h"
92 #include "src/core/lib/iomgr/combiner.h"
93 #include "src/core/lib/iomgr/sockaddr.h"
94 #include "src/core/lib/iomgr/sockaddr_utils.h"
95 #include "src/core/lib/iomgr/timer.h"
96 #include "src/core/lib/slice/slice_hash_table.h"
97 #include "src/core/lib/slice/slice_internal.h"
98 #include "src/core/lib/slice/slice_string_helpers.h"
99 #include "src/core/lib/surface/call.h"
100 #include "src/core/lib/surface/channel.h"
101 #include "src/core/lib/surface/channel_init.h"
102 #include "src/core/lib/transport/static_metadata.h"
103
104 #define GRPC_GRPCLB_INITIAL_CONNECT_BACKOFF_SECONDS 1
105 #define GRPC_GRPCLB_RECONNECT_BACKOFF_MULTIPLIER 1.6
106 #define GRPC_GRPCLB_RECONNECT_MAX_BACKOFF_SECONDS 120
107 #define GRPC_GRPCLB_RECONNECT_JITTER 0.2
108 #define GRPC_GRPCLB_DEFAULT_FALLBACK_TIMEOUT_MS 10000
109
110 #define GRPC_ARG_GRPCLB_ADDRESS_LB_TOKEN "grpc.grpclb_address_lb_token"
111 #define GRPC_ARG_GRPCLB_ADDRESS_CLIENT_STATS "grpc.grpclb_address_client_stats"
112
113 namespace grpc_core {
114
115 TraceFlag grpc_lb_glb_trace(false, "glb");
116
117 const char kGrpcLbClientStatsMetadataKey[] = "grpclb_client_stats";
118 const char kGrpcLbLbTokenMetadataKey[] = "lb-token";
119
120 namespace {
121
122 constexpr char kGrpclb[] = "grpclb";
123
124 class ParsedGrpcLbConfig : public LoadBalancingPolicy::Config {
125  public:
126   explicit ParsedGrpcLbConfig(
127       RefCountedPtr<LoadBalancingPolicy::Config> child_policy)
128       : child_policy_(std::move(child_policy)) {}
129   const char* name() const override { return kGrpclb; }
130
131   RefCountedPtr<LoadBalancingPolicy::Config> child_policy() const {
132     return child_policy_;
133   }
134
135  private:
136   RefCountedPtr<LoadBalancingPolicy::Config> child_policy_;
137 };
138
139 class GrpcLb : public LoadBalancingPolicy {
140  public:
141   explicit GrpcLb(Args args);
142
143   const char* name() const override { return kGrpclb; }
144
145   void UpdateLocked(UpdateArgs args) override;
146   void ResetBackoffLocked() override;
147
148  private:
149   /// Contains a call to the LB server and all the data related to the call.
150   class BalancerCallState : public InternallyRefCounted<BalancerCallState> {
151    public:
152     explicit BalancerCallState(
153         RefCountedPtr<LoadBalancingPolicy> parent_grpclb_policy);
154
155     // It's the caller's responsibility to ensure that Orphan() is called from
156     // inside the combiner.
157     void Orphan() override;
158
159     void StartQuery();
160
161     GrpcLbClientStats* client_stats() const { return client_stats_.get(); }
162
163     bool seen_initial_response() const { return seen_initial_response_; }
164     bool seen_serverlist() const { return seen_serverlist_; }
165
166    private:
167     GRPC_ALLOW_CLASS_TO_USE_NON_PUBLIC_DELETE
168
169     ~BalancerCallState();
170
171     GrpcLb* grpclb_policy() const {
172       return static_cast<GrpcLb*>(grpclb_policy_.get());
173     }
174
175     void ScheduleNextClientLoadReportLocked();
176     void SendClientLoadReportLocked();
177
178     static bool LoadReportCountersAreZero(grpc_grpclb_request* request);
179
180     static void MaybeSendClientLoadReportLocked(void* arg, grpc_error* error);
181     static void ClientLoadReportDoneLocked(void* arg, grpc_error* error);
182     static void OnInitialRequestSentLocked(void* arg, grpc_error* error);
183     static void OnBalancerMessageReceivedLocked(void* arg, grpc_error* error);
184     static void OnBalancerStatusReceivedLocked(void* arg, grpc_error* error);
185
186     // The owning LB policy.
187     RefCountedPtr<LoadBalancingPolicy> grpclb_policy_;
188
189     // The streaming call to the LB server. Always non-NULL.
190     grpc_call* lb_call_ = nullptr;
191
192     // recv_initial_metadata
193     grpc_metadata_array lb_initial_metadata_recv_;
194
195     // send_message
196     grpc_byte_buffer* send_message_payload_ = nullptr;
197     grpc_closure lb_on_initial_request_sent_;
198
199     // recv_message
200     grpc_byte_buffer* recv_message_payload_ = nullptr;
201     grpc_closure lb_on_balancer_message_received_;
202     bool seen_initial_response_ = false;
203     bool seen_serverlist_ = false;
204
205     // recv_trailing_metadata
206     grpc_closure lb_on_balancer_status_received_;
207     grpc_metadata_array lb_trailing_metadata_recv_;
208     grpc_status_code lb_call_status_;
209     grpc_slice lb_call_status_details_;
210
211     // The stats for client-side load reporting associated with this LB call.
212     // Created after the first serverlist is received.
213     RefCountedPtr<GrpcLbClientStats> client_stats_;
214     grpc_millis client_stats_report_interval_ = 0;
215     grpc_timer client_load_report_timer_;
216     bool client_load_report_timer_callback_pending_ = false;
217     bool last_client_load_report_counters_were_zero_ = false;
218     bool client_load_report_is_due_ = false;
219     // The closure used for either the load report timer or the callback for
220     // completion of sending the load report.
221     grpc_closure client_load_report_closure_;
222   };
223
224   class Serverlist : public RefCounted<Serverlist> {
225    public:
226     // Takes ownership of serverlist.
227     explicit Serverlist(grpc_grpclb_serverlist* serverlist)
228         : serverlist_(serverlist) {}
229
230     ~Serverlist() { grpc_grpclb_destroy_serverlist(serverlist_); }
231
232     bool operator==(const Serverlist& other) const;
233
234     const grpc_grpclb_serverlist* serverlist() const { return serverlist_; }
235
236     // Returns a text representation suitable for logging.
237     UniquePtr<char> AsText() const;
238
239     // Extracts all non-drop entries into a ServerAddressList.
240     ServerAddressList GetServerAddressList(
241         GrpcLbClientStats* client_stats) const;
242
243     // Returns true if the serverlist contains at least one drop entry and
244     // no backend address entries.
245     bool ContainsAllDropEntries() const;
246
247     // Returns the LB token to use for a drop, or null if the call
248     // should not be dropped.
249     //
250     // Note: This is called from the picker, so it will be invoked in
251     // the channel's data plane combiner, NOT the control plane
252     // combiner.  It should not be accessed by any other part of the LB
253     // policy.
254     const char* ShouldDrop();
255
256    private:
257     grpc_grpclb_serverlist* serverlist_;
258
259     // Guarded by the channel's data plane combiner, NOT the control
260     // plane combiner.  It should not be accessed by anything but the
261     // picker via the ShouldDrop() method.
262     size_t drop_index_ = 0;
263   };
264
265   class Picker : public SubchannelPicker {
266    public:
267     Picker(GrpcLb* parent, RefCountedPtr<Serverlist> serverlist,
268            UniquePtr<SubchannelPicker> child_picker,
269            RefCountedPtr<GrpcLbClientStats> client_stats)
270         : parent_(parent),
271           serverlist_(std::move(serverlist)),
272           child_picker_(std::move(child_picker)),
273           client_stats_(std::move(client_stats)) {}
274
275     PickResult Pick(PickArgs args) override;
276
277    private:
278     // Storing the address for logging, but not holding a ref.
279     // DO NOT DEFERENCE!
280     GrpcLb* parent_;
281
282     // Serverlist to be used for determining drops.
283     RefCountedPtr<Serverlist> serverlist_;
284
285     UniquePtr<SubchannelPicker> child_picker_;
286     RefCountedPtr<GrpcLbClientStats> client_stats_;
287   };
288
289   class Helper : public ChannelControlHelper {
290    public:
291     explicit Helper(RefCountedPtr<GrpcLb> parent)
292         : parent_(std::move(parent)) {}
293
294     RefCountedPtr<SubchannelInterface> CreateSubchannel(
295         const grpc_channel_args& args) override;
296     void UpdateState(grpc_connectivity_state state,
297                      UniquePtr<SubchannelPicker> picker) override;
298     void RequestReresolution() override;
299     void AddTraceEvent(TraceSeverity severity, StringView message) override;
300
301     void set_child(LoadBalancingPolicy* child) { child_ = child; }
302
303    private:
304     bool CalledByPendingChild() const;
305     bool CalledByCurrentChild() const;
306
307     RefCountedPtr<GrpcLb> parent_;
308     LoadBalancingPolicy* child_ = nullptr;
309   };
310
311   ~GrpcLb();
312
313   void ShutdownLocked() override;
314
315   // Helper functions used in UpdateLocked().
316   void ProcessAddressesAndChannelArgsLocked(const ServerAddressList& addresses,
317                                             const grpc_channel_args& args);
318   static void OnBalancerChannelConnectivityChangedLocked(void* arg,
319                                                          grpc_error* error);
320   void CancelBalancerChannelConnectivityWatchLocked();
321
322   // Methods for dealing with fallback state.
323   void MaybeEnterFallbackModeAfterStartup();
324   static void OnFallbackTimerLocked(void* arg, grpc_error* error);
325
326   // Methods for dealing with the balancer call.
327   void StartBalancerCallLocked();
328   void StartBalancerCallRetryTimerLocked();
329   static void OnBalancerCallRetryTimerLocked(void* arg, grpc_error* error);
330
331   // Methods for dealing with the child policy.
332   grpc_channel_args* CreateChildPolicyArgsLocked(
333       bool is_backend_from_grpclb_load_balancer);
334   OrphanablePtr<LoadBalancingPolicy> CreateChildPolicyLocked(
335       const char* name, const grpc_channel_args* args);
336   void CreateOrUpdateChildPolicyLocked();
337
338   // Who the client is trying to communicate with.
339   const char* server_name_ = nullptr;
340
341   // Current channel args from the resolver.
342   grpc_channel_args* args_ = nullptr;
343
344   // Internal state.
345   bool shutting_down_ = false;
346
347   // The channel for communicating with the LB server.
348   grpc_channel* lb_channel_ = nullptr;
349   // Response generator to inject address updates into lb_channel_.
350   RefCountedPtr<FakeResolverResponseGenerator> response_generator_;
351
352   // The data associated with the current LB call. It holds a ref to this LB
353   // policy. It's initialized every time we query for backends. It's reset to
354   // NULL whenever the current LB call is no longer needed (e.g., the LB policy
355   // is shutting down, or the LB call has ended). A non-NULL lb_calld_ always
356   // contains a non-NULL lb_call_.
357   OrphanablePtr<BalancerCallState> lb_calld_;
358   // Timeout in milliseconds for the LB call. 0 means no deadline.
359   int lb_call_timeout_ms_ = 0;
360   // Balancer call retry state.
361   BackOff lb_call_backoff_;
362   bool retry_timer_callback_pending_ = false;
363   grpc_timer lb_call_retry_timer_;
364   grpc_closure lb_on_call_retry_;
365
366   // The deserialized response from the balancer. May be nullptr until one
367   // such response has arrived.
368   RefCountedPtr<Serverlist> serverlist_;
369
370   // Whether we're in fallback mode.
371   bool fallback_mode_ = false;
372   // The backend addresses from the resolver.
373   ServerAddressList fallback_backend_addresses_;
374   // State for fallback-at-startup checks.
375   // Timeout after startup after which we will go into fallback mode if
376   // we have not received a serverlist from the balancer.
377   int fallback_at_startup_timeout_ = 0;
378   bool fallback_at_startup_checks_pending_ = false;
379   grpc_timer lb_fallback_timer_;
380   grpc_closure lb_on_fallback_;
381   grpc_connectivity_state lb_channel_connectivity_ = GRPC_CHANNEL_IDLE;
382   grpc_closure lb_channel_on_connectivity_changed_;
383
384   // The child policy to use for the backends.
385   OrphanablePtr<LoadBalancingPolicy> child_policy_;
386   // When switching child policies, the new policy will be stored here
387   // until it reports READY, at which point it will be moved to child_policy_.
388   OrphanablePtr<LoadBalancingPolicy> pending_child_policy_;
389   // The child policy config.
390   RefCountedPtr<LoadBalancingPolicy::Config> child_policy_config_;
391   // Child policy in state READY.
392   bool child_policy_ready_ = false;
393 };
394
395 //
396 // GrpcLb::Serverlist
397 //
398
399 bool GrpcLb::Serverlist::operator==(const Serverlist& other) const {
400   return grpc_grpclb_serverlist_equals(serverlist_, other.serverlist_);
401 }
402
403 void ParseServer(const grpc_grpclb_server* server,
404                  grpc_resolved_address* addr) {
405   memset(addr, 0, sizeof(*addr));
406   if (server->drop) return;
407   const uint16_t netorder_port = grpc_htons((uint16_t)server->port);
408   /* the addresses are given in binary format (a in(6)_addr struct) in
409    * server->ip_address.bytes. */
410   const grpc_grpclb_server_ip_address& ip = server->ip_address;
411   if (ip.size == 4) {
412     addr->len = static_cast<socklen_t>(sizeof(grpc_sockaddr_in));
413     grpc_sockaddr_in* addr4 = reinterpret_cast<grpc_sockaddr_in*>(&addr->addr);
414     addr4->sin_family = GRPC_AF_INET;
415     memcpy(&addr4->sin_addr, ip.data, ip.size);
416     addr4->sin_port = netorder_port;
417   } else if (ip.size == 16) {
418     addr->len = static_cast<socklen_t>(sizeof(grpc_sockaddr_in6));
419     grpc_sockaddr_in6* addr6 = (grpc_sockaddr_in6*)&addr->addr;
420     addr6->sin6_family = GRPC_AF_INET6;
421     memcpy(&addr6->sin6_addr, ip.data, ip.size);
422     addr6->sin6_port = netorder_port;
423   }
424 }
425
426 UniquePtr<char> GrpcLb::Serverlist::AsText() const {
427   gpr_strvec entries;
428   gpr_strvec_init(&entries);
429   for (size_t i = 0; i < serverlist_->num_servers; ++i) {
430     const auto* server = serverlist_->servers[i];
431     char* ipport;
432     if (server->drop) {
433       ipport = gpr_strdup("(drop)");
434     } else {
435       grpc_resolved_address addr;
436       ParseServer(server, &addr);
437       grpc_sockaddr_to_string(&ipport, &addr, false);
438     }
439     char* entry;
440     gpr_asprintf(&entry, "  %" PRIuPTR ": %s token=%s\n", i, ipport,
441                  server->load_balance_token);
442     gpr_free(ipport);
443     gpr_strvec_add(&entries, entry);
444   }
445   UniquePtr<char> result(gpr_strvec_flatten(&entries, nullptr));
446   gpr_strvec_destroy(&entries);
447   return result;
448 }
449
450 // vtables for channel args for LB token and client stats.
451 void* lb_token_copy(void* token) {
452   return gpr_strdup(static_cast<char*>(token));
453 }
454 void lb_token_destroy(void* token) { gpr_free(token); }
455 void* client_stats_copy(void* p) {
456   GrpcLbClientStats* client_stats = static_cast<GrpcLbClientStats*>(p);
457   client_stats->Ref().release();
458   return p;
459 }
460 void client_stats_destroy(void* p) {
461   GrpcLbClientStats* client_stats = static_cast<GrpcLbClientStats*>(p);
462   client_stats->Unref();
463 }
464 int equal_cmp(void* p1, void* p2) {
465   // Always indicate a match, since we don't want this channel arg to
466   // affect the subchannel's key in the index.
467   // TODO(roth): Is this right?  This does prevent us from needlessly
468   // recreating the subchannel whenever the LB token or client stats
469   // changes (i.e., when the balancer call is terminated and reestablished).
470   // However, it means that we don't actually recreate the subchannel,
471   // which means that we won't ever switch over to using the new LB
472   // token or client stats.  A better approach might be to find somewhere
473   // other than the subchannel args to store the LB token and client
474   // stats.  They could be stored in a map and then looked up for each
475   // call (although we'd need to make sure our Map<> implementation is
476   // performant enough).  Or we could do something more complicated whereby
477   // we create our own subchannel wrapper to store them, although that would
478   // involve a lot of refcounting overhead.
479   // Given that we're trying to move from grpclb to xds at this point,
480   // and that no one has actually reported any problems with this, we
481   // probably won't bother fixing this at this point.
482   return 0;
483 }
484 const grpc_arg_pointer_vtable lb_token_arg_vtable = {
485     lb_token_copy, lb_token_destroy, equal_cmp};
486 const grpc_arg_pointer_vtable client_stats_arg_vtable = {
487     client_stats_copy, client_stats_destroy, equal_cmp};
488
489 bool IsServerValid(const grpc_grpclb_server* server, size_t idx, bool log) {
490   if (server->drop) return false;
491   const grpc_grpclb_server_ip_address& ip = server->ip_address;
492   if (GPR_UNLIKELY(server->port >> 16 != 0)) {
493     if (log) {
494       gpr_log(GPR_ERROR,
495               "Invalid port '%d' at index %lu of serverlist. Ignoring.",
496               server->port, (unsigned long)idx);
497     }
498     return false;
499   }
500   if (GPR_UNLIKELY(ip.size != 4 && ip.size != 16)) {
501     if (log) {
502       gpr_log(GPR_ERROR,
503               "Expected IP to be 4 or 16 bytes, got %d at index %lu of "
504               "serverlist. Ignoring",
505               ip.size, (unsigned long)idx);
506     }
507     return false;
508   }
509   return true;
510 }
511
512 // Returns addresses extracted from the serverlist.
513 ServerAddressList GrpcLb::Serverlist::GetServerAddressList(
514     GrpcLbClientStats* client_stats) const {
515   ServerAddressList addresses;
516   for (size_t i = 0; i < serverlist_->num_servers; ++i) {
517     const grpc_grpclb_server* server = serverlist_->servers[i];
518     if (!IsServerValid(serverlist_->servers[i], i, false)) continue;
519     // Address processing.
520     grpc_resolved_address addr;
521     ParseServer(server, &addr);
522     // LB token processing.
523     char lb_token[GPR_ARRAY_SIZE(server->load_balance_token) + 1];
524     if (server->load_balance_token[0] != 0) {
525       const size_t lb_token_max_length =
526           GPR_ARRAY_SIZE(server->load_balance_token);
527       const size_t lb_token_length =
528           strnlen(server->load_balance_token, lb_token_max_length);
529       memcpy(lb_token, server->load_balance_token, lb_token_length);
530       lb_token[lb_token_length] = '\0';
531     } else {
532       char* uri = grpc_sockaddr_to_uri(&addr);
533       gpr_log(GPR_INFO,
534               "Missing LB token for backend address '%s'. The empty token will "
535               "be used instead",
536               uri);
537       gpr_free(uri);
538       lb_token[0] = '\0';
539     }
540     // Add address.
541     InlinedVector<grpc_arg, 2> args_to_add;
542     args_to_add.emplace_back(grpc_channel_arg_pointer_create(
543         const_cast<char*>(GRPC_ARG_GRPCLB_ADDRESS_LB_TOKEN), lb_token,
544         &lb_token_arg_vtable));
545     if (client_stats != nullptr) {
546       args_to_add.emplace_back(grpc_channel_arg_pointer_create(
547           const_cast<char*>(GRPC_ARG_GRPCLB_ADDRESS_CLIENT_STATS), client_stats,
548           &client_stats_arg_vtable));
549     }
550     grpc_channel_args* args = grpc_channel_args_copy_and_add(
551         nullptr, args_to_add.data(), args_to_add.size());
552     addresses.emplace_back(addr, args);
553   }
554   return addresses;
555 }
556
557 bool GrpcLb::Serverlist::ContainsAllDropEntries() const {
558   if (serverlist_->num_servers == 0) return false;
559   for (size_t i = 0; i < serverlist_->num_servers; ++i) {
560     if (!serverlist_->servers[i]->drop) return false;
561   }
562   return true;
563 }
564
565 const char* GrpcLb::Serverlist::ShouldDrop() {
566   if (serverlist_->num_servers == 0) return nullptr;
567   grpc_grpclb_server* server = serverlist_->servers[drop_index_];
568   drop_index_ = (drop_index_ + 1) % serverlist_->num_servers;
569   return server->drop ? server->load_balance_token : nullptr;
570 }
571
572 //
573 // GrpcLb::Picker
574 //
575
576 GrpcLb::PickResult GrpcLb::Picker::Pick(PickArgs args) {
577   PickResult result;
578   // Check if we should drop the call.
579   const char* drop_token = serverlist_->ShouldDrop();
580   if (drop_token != nullptr) {
581     // Update client load reporting stats to indicate the number of
582     // dropped calls.  Note that we have to do this here instead of in
583     // the client_load_reporting filter, because we do not create a
584     // subchannel call (and therefore no client_load_reporting filter)
585     // for dropped calls.
586     if (client_stats_ != nullptr) {
587       client_stats_->AddCallDropped(drop_token);
588     }
589     result.type = PickResult::PICK_COMPLETE;
590     return result;
591   }
592   // Forward pick to child policy.
593   result = child_picker_->Pick(args);
594   // If pick succeeded, add LB token to initial metadata.
595   if (result.type == PickResult::PICK_COMPLETE &&
596       result.subchannel != nullptr) {
597     // Encode client stats object into metadata for use by
598     // client_load_reporting filter.
599     const grpc_arg* arg =
600         grpc_channel_args_find(result.subchannel->channel_args(),
601                                GRPC_ARG_GRPCLB_ADDRESS_CLIENT_STATS);
602     if (arg != nullptr && arg->type == GRPC_ARG_POINTER &&
603         arg->value.pointer.p != nullptr) {
604       GrpcLbClientStats* client_stats =
605           static_cast<GrpcLbClientStats*>(arg->value.pointer.p);
606       client_stats->Ref().release();  // Ref passed via metadata.
607       // The metadata value is a hack: we pretend the pointer points to
608       // a string and rely on the client_load_reporting filter to know
609       // how to interpret it.
610       args.initial_metadata->Add(
611           kGrpcLbClientStatsMetadataKey,
612           StringView(reinterpret_cast<const char*>(client_stats), 0));
613       // Update calls-started.
614       client_stats->AddCallStarted();
615     }
616     // Encode the LB token in metadata.
617     arg = grpc_channel_args_find(result.subchannel->channel_args(),
618                                  GRPC_ARG_GRPCLB_ADDRESS_LB_TOKEN);
619     if (arg == nullptr) {
620       gpr_log(GPR_ERROR, "[grpclb %p picker %p] No LB token for subchannel %p",
621               parent_, this, result.subchannel.get());
622       abort();
623     }
624     args.initial_metadata->Add(kGrpcLbLbTokenMetadataKey,
625                                static_cast<char*>(arg->value.pointer.p));
626   }
627   return result;
628 }
629
630 //
631 // GrpcLb::Helper
632 //
633
634 bool GrpcLb::Helper::CalledByPendingChild() const {
635   GPR_ASSERT(child_ != nullptr);
636   return child_ == parent_->pending_child_policy_.get();
637 }
638
639 bool GrpcLb::Helper::CalledByCurrentChild() const {
640   GPR_ASSERT(child_ != nullptr);
641   return child_ == parent_->child_policy_.get();
642 }
643
644 RefCountedPtr<SubchannelInterface> GrpcLb::Helper::CreateSubchannel(
645     const grpc_channel_args& args) {
646   if (parent_->shutting_down_ ||
647       (!CalledByPendingChild() && !CalledByCurrentChild())) {
648     return nullptr;
649   }
650   return parent_->channel_control_helper()->CreateSubchannel(args);
651 }
652
653 void GrpcLb::Helper::UpdateState(grpc_connectivity_state state,
654                                  UniquePtr<SubchannelPicker> picker) {
655   if (parent_->shutting_down_) return;
656   // If this request is from the pending child policy, ignore it until
657   // it reports READY, at which point we swap it into place.
658   if (CalledByPendingChild()) {
659     if (GRPC_TRACE_FLAG_ENABLED(grpc_lb_glb_trace)) {
660       gpr_log(GPR_INFO,
661               "[grpclb %p helper %p] pending child policy %p reports state=%s",
662               parent_.get(), this, parent_->pending_child_policy_.get(),
663               grpc_connectivity_state_name(state));
664     }
665     if (state != GRPC_CHANNEL_READY) return;
666     grpc_pollset_set_del_pollset_set(
667         parent_->child_policy_->interested_parties(),
668         parent_->interested_parties());
669     parent_->child_policy_ = std::move(parent_->pending_child_policy_);
670   } else if (!CalledByCurrentChild()) {
671     // This request is from an outdated child, so ignore it.
672     return;
673   }
674   // Record whether child policy reports READY.
675   parent_->child_policy_ready_ = state == GRPC_CHANNEL_READY;
676   // Enter fallback mode if needed.
677   parent_->MaybeEnterFallbackModeAfterStartup();
678   // There are three cases to consider here:
679   // 1. We're in fallback mode.  In this case, we're always going to use
680   //    the child policy's result, so we pass its picker through as-is.
681   // 2. The serverlist contains only drop entries.  In this case, we
682   //    want to use our own picker so that we can return the drops.
683   // 3. Not in fallback mode and serverlist is not all drops (i.e., it
684   //    may be empty or contain at least one backend address).  There are
685   //    two sub-cases:
686   //    a. The child policy is reporting state READY.  In this case, we wrap
687   //       the child's picker in our own, so that we can handle drops and LB
688   //       token metadata for each pick.
689   //    b. The child policy is reporting a state other than READY.  In this
690   //       case, we don't want to use our own picker, because we don't want
691   //       to process drops for picks that yield a QUEUE result; this would
692   //       result in dropping too many calls, since we will see the
693   //       queued picks multiple times, and we'd consider each one a
694   //       separate call for the drop calculation.
695   //
696   // Cases 1 and 3b: return picker from the child policy as-is.
697   if (parent_->serverlist_ == nullptr ||
698       (!parent_->serverlist_->ContainsAllDropEntries() &&
699        state != GRPC_CHANNEL_READY)) {
700     if (GRPC_TRACE_FLAG_ENABLED(grpc_lb_glb_trace)) {
701       gpr_log(GPR_INFO,
702               "[grpclb %p helper %p] state=%s passing child picker %p as-is",
703               parent_.get(), this, grpc_connectivity_state_name(state),
704               picker.get());
705     }
706     parent_->channel_control_helper()->UpdateState(state, std::move(picker));
707     return;
708   }
709   // Cases 2 and 3a: wrap picker from the child in our own picker.
710   if (GRPC_TRACE_FLAG_ENABLED(grpc_lb_glb_trace)) {
711     gpr_log(GPR_INFO, "[grpclb %p helper %p] state=%s wrapping child picker %p",
712             parent_.get(), this, grpc_connectivity_state_name(state),
713             picker.get());
714   }
715   RefCountedPtr<GrpcLbClientStats> client_stats;
716   if (parent_->lb_calld_ != nullptr &&
717       parent_->lb_calld_->client_stats() != nullptr) {
718     client_stats = parent_->lb_calld_->client_stats()->Ref();
719   }
720   parent_->channel_control_helper()->UpdateState(
721       state, UniquePtr<SubchannelPicker>(
722                  New<Picker>(parent_.get(), parent_->serverlist_,
723                              std::move(picker), std::move(client_stats))));
724 }
725
726 void GrpcLb::Helper::RequestReresolution() {
727   if (parent_->shutting_down_) return;
728   const LoadBalancingPolicy* latest_child_policy =
729       parent_->pending_child_policy_ != nullptr
730           ? parent_->pending_child_policy_.get()
731           : parent_->child_policy_.get();
732   if (child_ != latest_child_policy) return;
733   if (GRPC_TRACE_FLAG_ENABLED(grpc_lb_glb_trace)) {
734     gpr_log(GPR_INFO,
735             "[grpclb %p] Re-resolution requested from %schild policy (%p).",
736             parent_.get(), CalledByPendingChild() ? "pending " : "", child_);
737   }
738   // If we are talking to a balancer, we expect to get updated addresses
739   // from the balancer, so we can ignore the re-resolution request from
740   // the child policy. Otherwise, pass the re-resolution request up to the
741   // channel.
742   if (parent_->lb_calld_ == nullptr ||
743       !parent_->lb_calld_->seen_initial_response()) {
744     parent_->channel_control_helper()->RequestReresolution();
745   }
746 }
747
748 void GrpcLb::Helper::AddTraceEvent(TraceSeverity severity, StringView message) {
749   if (parent_->shutting_down_ ||
750       (!CalledByPendingChild() && !CalledByCurrentChild())) {
751     return;
752   }
753   parent_->channel_control_helper()->AddTraceEvent(severity, message);
754 }
755
756 //
757 // GrpcLb::BalancerCallState
758 //
759
760 GrpcLb::BalancerCallState::BalancerCallState(
761     RefCountedPtr<LoadBalancingPolicy> parent_grpclb_policy)
762     : InternallyRefCounted<BalancerCallState>(&grpc_lb_glb_trace),
763       grpclb_policy_(std::move(parent_grpclb_policy)) {
764   GPR_ASSERT(grpclb_policy_ != nullptr);
765   GPR_ASSERT(!grpclb_policy()->shutting_down_);
766   // Init the LB call. Note that the LB call will progress every time there's
767   // activity in grpclb_policy_->interested_parties(), which is comprised of
768   // the polling entities from client_channel.
769   GPR_ASSERT(grpclb_policy()->server_name_ != nullptr);
770   GPR_ASSERT(grpclb_policy()->server_name_[0] != '\0');
771   const grpc_millis deadline =
772       grpclb_policy()->lb_call_timeout_ms_ == 0
773           ? GRPC_MILLIS_INF_FUTURE
774           : ExecCtx::Get()->Now() + grpclb_policy()->lb_call_timeout_ms_;
775   lb_call_ = grpc_channel_create_pollset_set_call(
776       grpclb_policy()->lb_channel_, nullptr, GRPC_PROPAGATE_DEFAULTS,
777       grpclb_policy_->interested_parties(),
778       GRPC_MDSTR_SLASH_GRPC_DOT_LB_DOT_V1_DOT_LOADBALANCER_SLASH_BALANCELOAD,
779       nullptr, deadline, nullptr);
780   // Init the LB call request payload.
781   upb::Arena arena;
782   grpc_grpclb_request* request =
783       grpc_grpclb_request_create(grpclb_policy()->server_name_, arena.ptr());
784   grpc_slice request_payload_slice =
785       grpc_grpclb_request_encode(request, arena.ptr());
786   send_message_payload_ =
787       grpc_raw_byte_buffer_create(&request_payload_slice, 1);
788   grpc_slice_unref_internal(request_payload_slice);
789   // Init other data associated with the LB call.
790   grpc_metadata_array_init(&lb_initial_metadata_recv_);
791   grpc_metadata_array_init(&lb_trailing_metadata_recv_);
792   GRPC_CLOSURE_INIT(&lb_on_initial_request_sent_, OnInitialRequestSentLocked,
793                     this, grpc_combiner_scheduler(grpclb_policy()->combiner()));
794   GRPC_CLOSURE_INIT(&lb_on_balancer_message_received_,
795                     OnBalancerMessageReceivedLocked, this,
796                     grpc_combiner_scheduler(grpclb_policy()->combiner()));
797   GRPC_CLOSURE_INIT(&lb_on_balancer_status_received_,
798                     OnBalancerStatusReceivedLocked, this,
799                     grpc_combiner_scheduler(grpclb_policy()->combiner()));
800 }
801
802 GrpcLb::BalancerCallState::~BalancerCallState() {
803   GPR_ASSERT(lb_call_ != nullptr);
804   grpc_call_unref(lb_call_);
805   grpc_metadata_array_destroy(&lb_initial_metadata_recv_);
806   grpc_metadata_array_destroy(&lb_trailing_metadata_recv_);
807   grpc_byte_buffer_destroy(send_message_payload_);
808   grpc_byte_buffer_destroy(recv_message_payload_);
809   grpc_slice_unref_internal(lb_call_status_details_);
810 }
811
812 void GrpcLb::BalancerCallState::Orphan() {
813   GPR_ASSERT(lb_call_ != nullptr);
814   // If we are here because grpclb_policy wants to cancel the call,
815   // lb_on_balancer_status_received_ will complete the cancellation and clean
816   // up. Otherwise, we are here because grpclb_policy has to orphan a failed
817   // call, then the following cancellation will be a no-op.
818   grpc_call_cancel(lb_call_, nullptr);
819   if (client_load_report_timer_callback_pending_) {
820     grpc_timer_cancel(&client_load_report_timer_);
821   }
822   // Note that the initial ref is hold by lb_on_balancer_status_received_
823   // instead of the caller of this function. So the corresponding unref happens
824   // in lb_on_balancer_status_received_ instead of here.
825 }
826
827 void GrpcLb::BalancerCallState::StartQuery() {
828   GPR_ASSERT(lb_call_ != nullptr);
829   if (GRPC_TRACE_FLAG_ENABLED(grpc_lb_glb_trace)) {
830     gpr_log(GPR_INFO, "[grpclb %p] lb_calld=%p: Starting LB call %p",
831             grpclb_policy_.get(), this, lb_call_);
832   }
833   // Create the ops.
834   grpc_call_error call_error;
835   grpc_op ops[3];
836   memset(ops, 0, sizeof(ops));
837   // Op: send initial metadata.
838   grpc_op* op = ops;
839   op->op = GRPC_OP_SEND_INITIAL_METADATA;
840   op->data.send_initial_metadata.count = 0;
841   op->flags = GRPC_INITIAL_METADATA_WAIT_FOR_READY |
842               GRPC_INITIAL_METADATA_WAIT_FOR_READY_EXPLICITLY_SET;
843   op->reserved = nullptr;
844   op++;
845   // Op: send request message.
846   GPR_ASSERT(send_message_payload_ != nullptr);
847   op->op = GRPC_OP_SEND_MESSAGE;
848   op->data.send_message.send_message = send_message_payload_;
849   op->flags = 0;
850   op->reserved = nullptr;
851   op++;
852   // TODO(roth): We currently track this ref manually.  Once the
853   // ClosureRef API is ready, we should pass the RefCountedPtr<> along
854   // with the callback.
855   auto self = Ref(DEBUG_LOCATION, "on_initial_request_sent");
856   self.release();
857   call_error = grpc_call_start_batch_and_execute(
858       lb_call_, ops, (size_t)(op - ops), &lb_on_initial_request_sent_);
859   GPR_ASSERT(GRPC_CALL_OK == call_error);
860   // Op: recv initial metadata.
861   op = ops;
862   op->op = GRPC_OP_RECV_INITIAL_METADATA;
863   op->data.recv_initial_metadata.recv_initial_metadata =
864       &lb_initial_metadata_recv_;
865   op->flags = 0;
866   op->reserved = nullptr;
867   op++;
868   // Op: recv response.
869   op->op = GRPC_OP_RECV_MESSAGE;
870   op->data.recv_message.recv_message = &recv_message_payload_;
871   op->flags = 0;
872   op->reserved = nullptr;
873   op++;
874   // TODO(roth): We currently track this ref manually.  Once the
875   // ClosureRef API is ready, we should pass the RefCountedPtr<> along
876   // with the callback.
877   self = Ref(DEBUG_LOCATION, "on_message_received");
878   self.release();
879   call_error = grpc_call_start_batch_and_execute(
880       lb_call_, ops, (size_t)(op - ops), &lb_on_balancer_message_received_);
881   GPR_ASSERT(GRPC_CALL_OK == call_error);
882   // Op: recv server status.
883   op = ops;
884   op->op = GRPC_OP_RECV_STATUS_ON_CLIENT;
885   op->data.recv_status_on_client.trailing_metadata =
886       &lb_trailing_metadata_recv_;
887   op->data.recv_status_on_client.status = &lb_call_status_;
888   op->data.recv_status_on_client.status_details = &lb_call_status_details_;
889   op->flags = 0;
890   op->reserved = nullptr;
891   op++;
892   // This callback signals the end of the LB call, so it relies on the initial
893   // ref instead of a new ref. When it's invoked, it's the initial ref that is
894   // unreffed.
895   call_error = grpc_call_start_batch_and_execute(
896       lb_call_, ops, (size_t)(op - ops), &lb_on_balancer_status_received_);
897   GPR_ASSERT(GRPC_CALL_OK == call_error);
898 }
899
900 void GrpcLb::BalancerCallState::ScheduleNextClientLoadReportLocked() {
901   const grpc_millis next_client_load_report_time =
902       ExecCtx::Get()->Now() + client_stats_report_interval_;
903   GRPC_CLOSURE_INIT(&client_load_report_closure_,
904                     MaybeSendClientLoadReportLocked, this,
905                     grpc_combiner_scheduler(grpclb_policy()->combiner()));
906   grpc_timer_init(&client_load_report_timer_, next_client_load_report_time,
907                   &client_load_report_closure_);
908   client_load_report_timer_callback_pending_ = true;
909 }
910
911 void GrpcLb::BalancerCallState::MaybeSendClientLoadReportLocked(
912     void* arg, grpc_error* error) {
913   BalancerCallState* lb_calld = static_cast<BalancerCallState*>(arg);
914   GrpcLb* grpclb_policy = lb_calld->grpclb_policy();
915   lb_calld->client_load_report_timer_callback_pending_ = false;
916   if (error != GRPC_ERROR_NONE || lb_calld != grpclb_policy->lb_calld_.get()) {
917     lb_calld->Unref(DEBUG_LOCATION, "client_load_report");
918     return;
919   }
920   // If we've already sent the initial request, then we can go ahead and send
921   // the load report. Otherwise, we need to wait until the initial request has
922   // been sent to send this (see OnInitialRequestSentLocked()).
923   if (lb_calld->send_message_payload_ == nullptr) {
924     lb_calld->SendClientLoadReportLocked();
925   } else {
926     lb_calld->client_load_report_is_due_ = true;
927   }
928 }
929
930 bool GrpcLb::BalancerCallState::LoadReportCountersAreZero(
931     grpc_grpclb_request* request) {
932   const grpc_lb_v1_ClientStats* cstats =
933       grpc_lb_v1_LoadBalanceRequest_client_stats(request);
934   if (cstats == nullptr) {
935     return true;
936   }
937   size_t drop_count;
938   grpc_lb_v1_ClientStats_calls_finished_with_drop(cstats, &drop_count);
939   return grpc_lb_v1_ClientStats_num_calls_started(cstats) == 0 &&
940          grpc_lb_v1_ClientStats_num_calls_finished(cstats) == 0 &&
941          grpc_lb_v1_ClientStats_num_calls_finished_with_client_failed_to_send(
942              cstats) == 0 &&
943          grpc_lb_v1_ClientStats_num_calls_finished_known_received(cstats) ==
944              0 &&
945          drop_count == 0;
946 }
947
948 void GrpcLb::BalancerCallState::SendClientLoadReportLocked() {
949   // Construct message payload.
950   GPR_ASSERT(send_message_payload_ == nullptr);
951   upb::Arena arena;
952   grpc_grpclb_request* request =
953       grpc_grpclb_load_report_request_create(client_stats_.get(), arena.ptr());
954   // Skip client load report if the counters were all zero in the last
955   // report and they are still zero in this one.
956   if (LoadReportCountersAreZero(request)) {
957     if (last_client_load_report_counters_were_zero_) {
958       ScheduleNextClientLoadReportLocked();
959       return;
960     }
961     last_client_load_report_counters_were_zero_ = true;
962   } else {
963     last_client_load_report_counters_were_zero_ = false;
964   }
965   grpc_slice request_payload_slice =
966       grpc_grpclb_request_encode(request, arena.ptr());
967   send_message_payload_ =
968       grpc_raw_byte_buffer_create(&request_payload_slice, 1);
969   grpc_slice_unref_internal(request_payload_slice);
970   // Send the report.
971   grpc_op op;
972   memset(&op, 0, sizeof(op));
973   op.op = GRPC_OP_SEND_MESSAGE;
974   op.data.send_message.send_message = send_message_payload_;
975   GRPC_CLOSURE_INIT(&client_load_report_closure_, ClientLoadReportDoneLocked,
976                     this, grpc_combiner_scheduler(grpclb_policy()->combiner()));
977   grpc_call_error call_error = grpc_call_start_batch_and_execute(
978       lb_call_, &op, 1, &client_load_report_closure_);
979   if (GPR_UNLIKELY(call_error != GRPC_CALL_OK)) {
980     gpr_log(GPR_ERROR,
981             "[grpclb %p] lb_calld=%p call_error=%d sending client load report",
982             grpclb_policy_.get(), this, call_error);
983     GPR_ASSERT(GRPC_CALL_OK == call_error);
984   }
985 }
986
987 void GrpcLb::BalancerCallState::ClientLoadReportDoneLocked(void* arg,
988                                                            grpc_error* error) {
989   BalancerCallState* lb_calld = static_cast<BalancerCallState*>(arg);
990   GrpcLb* grpclb_policy = lb_calld->grpclb_policy();
991   grpc_byte_buffer_destroy(lb_calld->send_message_payload_);
992   lb_calld->send_message_payload_ = nullptr;
993   if (error != GRPC_ERROR_NONE || lb_calld != grpclb_policy->lb_calld_.get()) {
994     lb_calld->Unref(DEBUG_LOCATION, "client_load_report");
995     return;
996   }
997   lb_calld->ScheduleNextClientLoadReportLocked();
998 }
999
1000 void GrpcLb::BalancerCallState::OnInitialRequestSentLocked(void* arg,
1001                                                            grpc_error* error) {
1002   BalancerCallState* lb_calld = static_cast<BalancerCallState*>(arg);
1003   grpc_byte_buffer_destroy(lb_calld->send_message_payload_);
1004   lb_calld->send_message_payload_ = nullptr;
1005   // If we attempted to send a client load report before the initial request was
1006   // sent (and this lb_calld is still in use), send the load report now.
1007   if (lb_calld->client_load_report_is_due_ &&
1008       lb_calld == lb_calld->grpclb_policy()->lb_calld_.get()) {
1009     lb_calld->SendClientLoadReportLocked();
1010     lb_calld->client_load_report_is_due_ = false;
1011   }
1012   lb_calld->Unref(DEBUG_LOCATION, "on_initial_request_sent");
1013 }
1014
1015 void GrpcLb::BalancerCallState::OnBalancerMessageReceivedLocked(
1016     void* arg, grpc_error* error) {
1017   BalancerCallState* lb_calld = static_cast<BalancerCallState*>(arg);
1018   GrpcLb* grpclb_policy = lb_calld->grpclb_policy();
1019   // Null payload means the LB call was cancelled.
1020   if (lb_calld != grpclb_policy->lb_calld_.get() ||
1021       lb_calld->recv_message_payload_ == nullptr) {
1022     lb_calld->Unref(DEBUG_LOCATION, "on_message_received");
1023     return;
1024   }
1025   grpc_byte_buffer_reader bbr;
1026   grpc_byte_buffer_reader_init(&bbr, lb_calld->recv_message_payload_);
1027   grpc_slice response_slice = grpc_byte_buffer_reader_readall(&bbr);
1028   grpc_byte_buffer_reader_destroy(&bbr);
1029   grpc_byte_buffer_destroy(lb_calld->recv_message_payload_);
1030   lb_calld->recv_message_payload_ = nullptr;
1031   const grpc_grpclb_initial_response* initial_response;
1032   grpc_grpclb_serverlist* serverlist;
1033   upb::Arena arena;
1034   if (!lb_calld->seen_initial_response_ &&
1035       (initial_response = grpc_grpclb_initial_response_parse(
1036            response_slice, arena.ptr())) != nullptr) {
1037     // Have NOT seen initial response, look for initial response.
1038     const google_protobuf_Duration* client_stats_report_interval =
1039         grpc_lb_v1_InitialLoadBalanceResponse_client_stats_report_interval(
1040             initial_response);
1041     if (client_stats_report_interval != nullptr) {
1042       lb_calld->client_stats_report_interval_ =
1043           GPR_MAX(GPR_MS_PER_SEC,
1044                   grpc_grpclb_duration_to_millis(client_stats_report_interval));
1045       if (GRPC_TRACE_FLAG_ENABLED(grpc_lb_glb_trace)) {
1046         gpr_log(GPR_INFO,
1047                 "[grpclb %p] lb_calld=%p: Received initial LB response "
1048                 "message; client load reporting interval = %" PRId64
1049                 " milliseconds",
1050                 grpclb_policy, lb_calld,
1051                 lb_calld->client_stats_report_interval_);
1052       }
1053     } else if (GRPC_TRACE_FLAG_ENABLED(grpc_lb_glb_trace)) {
1054       gpr_log(GPR_INFO,
1055               "[grpclb %p] lb_calld=%p: Received initial LB response message; "
1056               "client load reporting NOT enabled",
1057               grpclb_policy, lb_calld);
1058     }
1059     lb_calld->seen_initial_response_ = true;
1060   } else if ((serverlist = grpc_grpclb_response_parse_serverlist(
1061                   response_slice)) != nullptr) {
1062     // Have seen initial response, look for serverlist.
1063     GPR_ASSERT(lb_calld->lb_call_ != nullptr);
1064     auto serverlist_wrapper = MakeRefCounted<Serverlist>(serverlist);
1065     if (GRPC_TRACE_FLAG_ENABLED(grpc_lb_glb_trace)) {
1066       UniquePtr<char> serverlist_text = serverlist_wrapper->AsText();
1067       gpr_log(GPR_INFO,
1068               "[grpclb %p] lb_calld=%p: Serverlist with %" PRIuPTR
1069               " servers received:\n%s",
1070               grpclb_policy, lb_calld, serverlist->num_servers,
1071               serverlist_text.get());
1072     }
1073     lb_calld->seen_serverlist_ = true;
1074     // Start sending client load report only after we start using the
1075     // serverlist returned from the current LB call.
1076     if (lb_calld->client_stats_report_interval_ > 0 &&
1077         lb_calld->client_stats_ == nullptr) {
1078       lb_calld->client_stats_ = MakeRefCounted<GrpcLbClientStats>();
1079       // Ref held by callback.
1080       lb_calld->Ref(DEBUG_LOCATION, "client_load_report").release();
1081       lb_calld->ScheduleNextClientLoadReportLocked();
1082     }
1083     // Check if the serverlist differs from the previous one.
1084     if (grpclb_policy->serverlist_ != nullptr &&
1085         *grpclb_policy->serverlist_ == *serverlist_wrapper) {
1086       if (GRPC_TRACE_FLAG_ENABLED(grpc_lb_glb_trace)) {
1087         gpr_log(GPR_INFO,
1088                 "[grpclb %p] lb_calld=%p: Incoming server list identical to "
1089                 "current, ignoring.",
1090                 grpclb_policy, lb_calld);
1091       }
1092     } else {  // New serverlist.
1093       // Dispose of the fallback.
1094       // TODO(roth): Ideally, we should stay in fallback mode until we
1095       // know that we can reach at least one of the backends in the new
1096       // serverlist.  Unfortunately, we can't do that, since we need to
1097       // send the new addresses to the child policy in order to determine
1098       // if they are reachable, and if we don't exit fallback mode now,
1099       // CreateOrUpdateChildPolicyLocked() will use the fallback
1100       // addresses instead of the addresses from the new serverlist.
1101       // However, if we can't reach any of the servers in the new
1102       // serverlist, then the child policy will never switch away from
1103       // the fallback addresses, but the grpclb policy will still think
1104       // that we're not in fallback mode, which means that we won't send
1105       // updates to the child policy when the fallback addresses are
1106       // updated by the resolver.  This is sub-optimal, but the only way
1107       // to fix it is to maintain a completely separate child policy for
1108       // fallback mode, and that's more work than we want to put into
1109       // the grpclb implementation at this point, since we're deprecating
1110       // it in favor of the xds policy.  We will implement this the
1111       // right way in the xds policy instead.
1112       if (grpclb_policy->fallback_mode_) {
1113         gpr_log(GPR_INFO,
1114                 "[grpclb %p] Received response from balancer; exiting "
1115                 "fallback mode",
1116                 grpclb_policy);
1117         grpclb_policy->fallback_mode_ = false;
1118       }
1119       if (grpclb_policy->fallback_at_startup_checks_pending_) {
1120         grpclb_policy->fallback_at_startup_checks_pending_ = false;
1121         grpc_timer_cancel(&grpclb_policy->lb_fallback_timer_);
1122         grpclb_policy->CancelBalancerChannelConnectivityWatchLocked();
1123       }
1124       // Update the serverlist in the GrpcLb instance. This serverlist
1125       // instance will be destroyed either upon the next update or when the
1126       // GrpcLb instance is destroyed.
1127       grpclb_policy->serverlist_ = std::move(serverlist_wrapper);
1128       grpclb_policy->CreateOrUpdateChildPolicyLocked();
1129     }
1130   } else {
1131     // No valid initial response or serverlist found.
1132     char* response_slice_str =
1133         grpc_dump_slice(response_slice, GPR_DUMP_ASCII | GPR_DUMP_HEX);
1134     gpr_log(GPR_ERROR,
1135             "[grpclb %p] lb_calld=%p: Invalid LB response received: '%s'. "
1136             "Ignoring.",
1137             grpclb_policy, lb_calld, response_slice_str);
1138     gpr_free(response_slice_str);
1139   }
1140   grpc_slice_unref_internal(response_slice);
1141   if (!grpclb_policy->shutting_down_) {
1142     // Keep listening for serverlist updates.
1143     grpc_op op;
1144     memset(&op, 0, sizeof(op));
1145     op.op = GRPC_OP_RECV_MESSAGE;
1146     op.data.recv_message.recv_message = &lb_calld->recv_message_payload_;
1147     op.flags = 0;
1148     op.reserved = nullptr;
1149     // Reuse the "OnBalancerMessageReceivedLocked" ref taken in StartQuery().
1150     const grpc_call_error call_error = grpc_call_start_batch_and_execute(
1151         lb_calld->lb_call_, &op, 1,
1152         &lb_calld->lb_on_balancer_message_received_);
1153     GPR_ASSERT(GRPC_CALL_OK == call_error);
1154   } else {
1155     lb_calld->Unref(DEBUG_LOCATION, "on_message_received+grpclb_shutdown");
1156   }
1157 }
1158
1159 void GrpcLb::BalancerCallState::OnBalancerStatusReceivedLocked(
1160     void* arg, grpc_error* error) {
1161   BalancerCallState* lb_calld = static_cast<BalancerCallState*>(arg);
1162   GrpcLb* grpclb_policy = lb_calld->grpclb_policy();
1163   GPR_ASSERT(lb_calld->lb_call_ != nullptr);
1164   if (GRPC_TRACE_FLAG_ENABLED(grpc_lb_glb_trace)) {
1165     char* status_details =
1166         grpc_slice_to_c_string(lb_calld->lb_call_status_details_);
1167     gpr_log(GPR_INFO,
1168             "[grpclb %p] lb_calld=%p: Status from LB server received. "
1169             "Status = %d, details = '%s', (lb_call: %p), error '%s'",
1170             grpclb_policy, lb_calld, lb_calld->lb_call_status_, status_details,
1171             lb_calld->lb_call_, grpc_error_string(error));
1172     gpr_free(status_details);
1173   }
1174   // If this lb_calld is still in use, this call ended because of a failure so
1175   // we want to retry connecting. Otherwise, we have deliberately ended this
1176   // call and no further action is required.
1177   if (lb_calld == grpclb_policy->lb_calld_.get()) {
1178     // If the fallback-at-startup checks are pending, go into fallback mode
1179     // immediately.  This short-circuits the timeout for the fallback-at-startup
1180     // case.
1181     if (grpclb_policy->fallback_at_startup_checks_pending_) {
1182       GPR_ASSERT(!lb_calld->seen_serverlist_);
1183       gpr_log(GPR_INFO,
1184               "[grpclb %p] Balancer call finished without receiving "
1185               "serverlist; entering fallback mode",
1186               grpclb_policy);
1187       grpclb_policy->fallback_at_startup_checks_pending_ = false;
1188       grpc_timer_cancel(&grpclb_policy->lb_fallback_timer_);
1189       grpclb_policy->CancelBalancerChannelConnectivityWatchLocked();
1190       grpclb_policy->fallback_mode_ = true;
1191       grpclb_policy->CreateOrUpdateChildPolicyLocked();
1192     } else {
1193       // This handles the fallback-after-startup case.
1194       grpclb_policy->MaybeEnterFallbackModeAfterStartup();
1195     }
1196     grpclb_policy->lb_calld_.reset();
1197     GPR_ASSERT(!grpclb_policy->shutting_down_);
1198     grpclb_policy->channel_control_helper()->RequestReresolution();
1199     if (lb_calld->seen_initial_response_) {
1200       // If we lose connection to the LB server, reset the backoff and restart
1201       // the LB call immediately.
1202       grpclb_policy->lb_call_backoff_.Reset();
1203       grpclb_policy->StartBalancerCallLocked();
1204     } else {
1205       // If this LB call fails establishing any connection to the LB server,
1206       // retry later.
1207       grpclb_policy->StartBalancerCallRetryTimerLocked();
1208     }
1209   }
1210   lb_calld->Unref(DEBUG_LOCATION, "lb_call_ended");
1211 }
1212
1213 //
1214 // helper code for creating balancer channel
1215 //
1216
1217 ServerAddressList ExtractBalancerAddresses(const ServerAddressList& addresses) {
1218   ServerAddressList balancer_addresses;
1219   for (size_t i = 0; i < addresses.size(); ++i) {
1220     if (addresses[i].IsBalancer()) {
1221       // Strip out the is_balancer channel arg, since we don't want to
1222       // recursively use the grpclb policy in the channel used to talk to
1223       // the balancers.  Note that we do NOT strip out the balancer_name
1224       // channel arg, since we need that to set the authority correctly
1225       // to talk to the balancers.
1226       static const char* args_to_remove[] = {
1227           GRPC_ARG_ADDRESS_IS_BALANCER,
1228       };
1229       balancer_addresses.emplace_back(
1230           addresses[i].address(),
1231           grpc_channel_args_copy_and_remove(addresses[i].args(), args_to_remove,
1232                                             GPR_ARRAY_SIZE(args_to_remove)));
1233     }
1234   }
1235   return balancer_addresses;
1236 }
1237
1238 /* Returns the channel args for the LB channel, used to create a bidirectional
1239  * stream for the reception of load balancing updates.
1240  *
1241  * Inputs:
1242  *   - \a addresses: corresponding to the balancers.
1243  *   - \a response_generator: in order to propagate updates from the resolver
1244  *   above the grpclb policy.
1245  *   - \a args: other args inherited from the grpclb policy. */
1246 grpc_channel_args* BuildBalancerChannelArgs(
1247     const ServerAddressList& addresses,
1248     FakeResolverResponseGenerator* response_generator,
1249     const grpc_channel_args* args) {
1250   // Channel args to remove.
1251   static const char* args_to_remove[] = {
1252       // LB policy name, since we want to use the default (pick_first) in
1253       // the LB channel.
1254       GRPC_ARG_LB_POLICY_NAME,
1255       // Strip out the service config, since we don't want the LB policy
1256       // config specified for the parent channel to affect the LB channel.
1257       GRPC_ARG_SERVICE_CONFIG,
1258       // The channel arg for the server URI, since that will be different for
1259       // the LB channel than for the parent channel.  The client channel
1260       // factory will re-add this arg with the right value.
1261       GRPC_ARG_SERVER_URI,
1262       // The fake resolver response generator, because we are replacing it
1263       // with the one from the grpclb policy, used to propagate updates to
1264       // the LB channel.
1265       GRPC_ARG_FAKE_RESOLVER_RESPONSE_GENERATOR,
1266       // The LB channel should use the authority indicated by the target
1267       // authority table (see \a ModifyGrpclbBalancerChannelArgs),
1268       // as opposed to the authority from the parent channel.
1269       GRPC_ARG_DEFAULT_AUTHORITY,
1270       // Just as for \a GRPC_ARG_DEFAULT_AUTHORITY, the LB channel should be
1271       // treated as a stand-alone channel and not inherit this argument from the
1272       // args of the parent channel.
1273       GRPC_SSL_TARGET_NAME_OVERRIDE_ARG,
1274       // Don't want to pass down channelz node from parent; the balancer
1275       // channel will get its own.
1276       GRPC_ARG_CHANNELZ_CHANNEL_NODE,
1277   };
1278   // Channel args to add.
1279   InlinedVector<grpc_arg, 3> args_to_add;
1280   // The fake resolver response generator, which we use to inject
1281   // address updates into the LB channel.
1282   args_to_add.emplace_back(
1283       grpc_core::FakeResolverResponseGenerator::MakeChannelArg(
1284           response_generator));
1285   // A channel arg indicating the target is a grpclb load balancer.
1286   args_to_add.emplace_back(grpc_channel_arg_integer_create(
1287       const_cast<char*>(GRPC_ARG_ADDRESS_IS_GRPCLB_LOAD_BALANCER), 1));
1288   // The parent channel's channelz uuid.
1289   channelz::ChannelNode* channelz_node = nullptr;
1290   const grpc_arg* arg =
1291       grpc_channel_args_find(args, GRPC_ARG_CHANNELZ_CHANNEL_NODE);
1292   if (arg != nullptr && arg->type == GRPC_ARG_POINTER &&
1293       arg->value.pointer.p != nullptr) {
1294     channelz_node = static_cast<channelz::ChannelNode*>(arg->value.pointer.p);
1295     args_to_add.emplace_back(
1296         channelz::MakeParentUuidArg(channelz_node->uuid()));
1297   }
1298   // Construct channel args.
1299   grpc_channel_args* new_args = grpc_channel_args_copy_and_add_and_remove(
1300       args, args_to_remove, GPR_ARRAY_SIZE(args_to_remove), args_to_add.data(),
1301       args_to_add.size());
1302   // Make any necessary modifications for security.
1303   return ModifyGrpclbBalancerChannelArgs(addresses, new_args);
1304 }
1305
1306 //
1307 // ctor and dtor
1308 //
1309
1310 GrpcLb::GrpcLb(Args args)
1311     : LoadBalancingPolicy(std::move(args)),
1312       response_generator_(MakeRefCounted<FakeResolverResponseGenerator>()),
1313       lb_call_backoff_(
1314           BackOff::Options()
1315               .set_initial_backoff(GRPC_GRPCLB_INITIAL_CONNECT_BACKOFF_SECONDS *
1316                                    1000)
1317               .set_multiplier(GRPC_GRPCLB_RECONNECT_BACKOFF_MULTIPLIER)
1318               .set_jitter(GRPC_GRPCLB_RECONNECT_JITTER)
1319               .set_max_backoff(GRPC_GRPCLB_RECONNECT_MAX_BACKOFF_SECONDS *
1320                                1000)) {
1321   // Initialization.
1322   GRPC_CLOSURE_INIT(&lb_on_fallback_, &GrpcLb::OnFallbackTimerLocked, this,
1323                     grpc_combiner_scheduler(combiner()));
1324   GRPC_CLOSURE_INIT(&lb_channel_on_connectivity_changed_,
1325                     &GrpcLb::OnBalancerChannelConnectivityChangedLocked, this,
1326                     grpc_combiner_scheduler(args.combiner));
1327   // Record server name.
1328   const grpc_arg* arg = grpc_channel_args_find(args.args, GRPC_ARG_SERVER_URI);
1329   const char* server_uri = grpc_channel_arg_get_string(arg);
1330   GPR_ASSERT(server_uri != nullptr);
1331   grpc_uri* uri = grpc_uri_parse(server_uri, true);
1332   GPR_ASSERT(uri->path[0] != '\0');
1333   server_name_ = gpr_strdup(uri->path[0] == '/' ? uri->path + 1 : uri->path);
1334   if (GRPC_TRACE_FLAG_ENABLED(grpc_lb_glb_trace)) {
1335     gpr_log(GPR_INFO,
1336             "[grpclb %p] Will use '%s' as the server name for LB request.",
1337             this, server_name_);
1338   }
1339   grpc_uri_destroy(uri);
1340   // Record LB call timeout.
1341   arg = grpc_channel_args_find(args.args, GRPC_ARG_GRPCLB_CALL_TIMEOUT_MS);
1342   lb_call_timeout_ms_ = grpc_channel_arg_get_integer(arg, {0, 0, INT_MAX});
1343   // Record fallback-at-startup timeout.
1344   arg = grpc_channel_args_find(args.args, GRPC_ARG_GRPCLB_FALLBACK_TIMEOUT_MS);
1345   fallback_at_startup_timeout_ = grpc_channel_arg_get_integer(
1346       arg, {GRPC_GRPCLB_DEFAULT_FALLBACK_TIMEOUT_MS, 0, INT_MAX});
1347 }
1348
1349 GrpcLb::~GrpcLb() {
1350   gpr_free((void*)server_name_);
1351   grpc_channel_args_destroy(args_);
1352 }
1353
1354 void GrpcLb::ShutdownLocked() {
1355   shutting_down_ = true;
1356   lb_calld_.reset();
1357   if (retry_timer_callback_pending_) {
1358     grpc_timer_cancel(&lb_call_retry_timer_);
1359   }
1360   if (fallback_at_startup_checks_pending_) {
1361     grpc_timer_cancel(&lb_fallback_timer_);
1362     CancelBalancerChannelConnectivityWatchLocked();
1363   }
1364   if (child_policy_ != nullptr) {
1365     grpc_pollset_set_del_pollset_set(child_policy_->interested_parties(),
1366                                      interested_parties());
1367   }
1368   if (pending_child_policy_ != nullptr) {
1369     grpc_pollset_set_del_pollset_set(
1370         pending_child_policy_->interested_parties(), interested_parties());
1371   }
1372   child_policy_.reset();
1373   pending_child_policy_.reset();
1374   // We destroy the LB channel here instead of in our destructor because
1375   // destroying the channel triggers a last callback to
1376   // OnBalancerChannelConnectivityChangedLocked(), and we need to be
1377   // alive when that callback is invoked.
1378   if (lb_channel_ != nullptr) {
1379     grpc_channel_destroy(lb_channel_);
1380     lb_channel_ = nullptr;
1381   }
1382 }
1383
1384 //
1385 // public methods
1386 //
1387
1388 void GrpcLb::ResetBackoffLocked() {
1389   if (lb_channel_ != nullptr) {
1390     grpc_channel_reset_connect_backoff(lb_channel_);
1391   }
1392   if (child_policy_ != nullptr) {
1393     child_policy_->ResetBackoffLocked();
1394   }
1395   if (pending_child_policy_ != nullptr) {
1396     pending_child_policy_->ResetBackoffLocked();
1397   }
1398 }
1399
1400 void GrpcLb::UpdateLocked(UpdateArgs args) {
1401   const bool is_initial_update = lb_channel_ == nullptr;
1402   auto* grpclb_config =
1403       static_cast<const ParsedGrpcLbConfig*>(args.config.get());
1404   if (grpclb_config != nullptr) {
1405     child_policy_config_ = grpclb_config->child_policy();
1406   } else {
1407     child_policy_config_ = nullptr;
1408   }
1409   ProcessAddressesAndChannelArgsLocked(args.addresses, *args.args);
1410   // Update the existing child policy.
1411   if (child_policy_ != nullptr) CreateOrUpdateChildPolicyLocked();
1412   // If this is the initial update, start the fallback-at-startup checks
1413   // and the balancer call.
1414   if (is_initial_update) {
1415     fallback_at_startup_checks_pending_ = true;
1416     // Start timer.
1417     grpc_millis deadline = ExecCtx::Get()->Now() + fallback_at_startup_timeout_;
1418     Ref(DEBUG_LOCATION, "on_fallback_timer").release();  // Ref for callback
1419     grpc_timer_init(&lb_fallback_timer_, deadline, &lb_on_fallback_);
1420     // Start watching the channel's connectivity state.  If the channel
1421     // goes into state TRANSIENT_FAILURE before the timer fires, we go into
1422     // fallback mode even if the fallback timeout has not elapsed.
1423     grpc_channel_element* client_channel_elem = grpc_channel_stack_last_element(
1424         grpc_channel_get_channel_stack(lb_channel_));
1425     GPR_ASSERT(client_channel_elem->filter == &grpc_client_channel_filter);
1426     // Ref held by callback.
1427     Ref(DEBUG_LOCATION, "watch_lb_channel_connectivity").release();
1428     grpc_client_channel_watch_connectivity_state(
1429         client_channel_elem,
1430         grpc_polling_entity_create_from_pollset_set(interested_parties()),
1431         &lb_channel_connectivity_, &lb_channel_on_connectivity_changed_,
1432         nullptr);
1433     // Start balancer call.
1434     StartBalancerCallLocked();
1435   }
1436 }
1437
1438 //
1439 // helpers for UpdateLocked()
1440 //
1441
1442 // Returns the backend addresses extracted from the given addresses.
1443 ServerAddressList ExtractBackendAddresses(const ServerAddressList& addresses) {
1444   static const char* lb_token = "";
1445   grpc_arg arg = grpc_channel_arg_pointer_create(
1446       const_cast<char*>(GRPC_ARG_GRPCLB_ADDRESS_LB_TOKEN),
1447       const_cast<char*>(lb_token), &lb_token_arg_vtable);
1448   ServerAddressList backend_addresses;
1449   for (size_t i = 0; i < addresses.size(); ++i) {
1450     if (!addresses[i].IsBalancer()) {
1451       backend_addresses.emplace_back(
1452           addresses[i].address(),
1453           grpc_channel_args_copy_and_add(addresses[i].args(), &arg, 1));
1454     }
1455   }
1456   return backend_addresses;
1457 }
1458
1459 void GrpcLb::ProcessAddressesAndChannelArgsLocked(
1460     const ServerAddressList& addresses, const grpc_channel_args& args) {
1461   // Update fallback address list.
1462   fallback_backend_addresses_ = ExtractBackendAddresses(addresses);
1463   // Make sure that GRPC_ARG_LB_POLICY_NAME is set in channel args,
1464   // since we use this to trigger the client_load_reporting filter.
1465   static const char* args_to_remove[] = {GRPC_ARG_LB_POLICY_NAME};
1466   grpc_arg new_arg = grpc_channel_arg_string_create(
1467       (char*)GRPC_ARG_LB_POLICY_NAME, (char*)"grpclb");
1468   grpc_channel_args_destroy(args_);
1469   args_ = grpc_channel_args_copy_and_add_and_remove(
1470       &args, args_to_remove, GPR_ARRAY_SIZE(args_to_remove), &new_arg, 1);
1471   // Construct args for balancer channel.
1472   ServerAddressList balancer_addresses = ExtractBalancerAddresses(addresses);
1473   grpc_channel_args* lb_channel_args = BuildBalancerChannelArgs(
1474       balancer_addresses, response_generator_.get(), &args);
1475   // Create balancer channel if needed.
1476   if (lb_channel_ == nullptr) {
1477     char* uri_str;
1478     gpr_asprintf(&uri_str, "fake:///%s", server_name_);
1479     lb_channel_ = CreateGrpclbBalancerChannel(uri_str, *lb_channel_args);
1480     GPR_ASSERT(lb_channel_ != nullptr);
1481     gpr_free(uri_str);
1482   }
1483   // Propagate updates to the LB channel (pick_first) through the fake
1484   // resolver.
1485   Resolver::Result result;
1486   result.addresses = std::move(balancer_addresses);
1487   result.args = lb_channel_args;
1488   response_generator_->SetResponse(std::move(result));
1489 }
1490
1491 void GrpcLb::OnBalancerChannelConnectivityChangedLocked(void* arg,
1492                                                         grpc_error* error) {
1493   GrpcLb* self = static_cast<GrpcLb*>(arg);
1494   if (!self->shutting_down_ && self->fallback_at_startup_checks_pending_) {
1495     if (self->lb_channel_connectivity_ != GRPC_CHANNEL_TRANSIENT_FAILURE) {
1496       // Not in TRANSIENT_FAILURE.  Renew connectivity watch.
1497       grpc_channel_element* client_channel_elem =
1498           grpc_channel_stack_last_element(
1499               grpc_channel_get_channel_stack(self->lb_channel_));
1500       GPR_ASSERT(client_channel_elem->filter == &grpc_client_channel_filter);
1501       grpc_client_channel_watch_connectivity_state(
1502           client_channel_elem,
1503           grpc_polling_entity_create_from_pollset_set(
1504               self->interested_parties()),
1505           &self->lb_channel_connectivity_,
1506           &self->lb_channel_on_connectivity_changed_, nullptr);
1507       return;  // Early out so we don't drop the ref below.
1508     }
1509     // In TRANSIENT_FAILURE.  Cancel the fallback timer and go into
1510     // fallback mode immediately.
1511     gpr_log(GPR_INFO,
1512             "[grpclb %p] balancer channel in state TRANSIENT_FAILURE; "
1513             "entering fallback mode",
1514             self);
1515     self->fallback_at_startup_checks_pending_ = false;
1516     grpc_timer_cancel(&self->lb_fallback_timer_);
1517     self->fallback_mode_ = true;
1518     self->CreateOrUpdateChildPolicyLocked();
1519   }
1520   // Done watching connectivity state, so drop ref.
1521   self->Unref(DEBUG_LOCATION, "watch_lb_channel_connectivity");
1522 }
1523
1524 void GrpcLb::CancelBalancerChannelConnectivityWatchLocked() {
1525   grpc_channel_element* client_channel_elem = grpc_channel_stack_last_element(
1526       grpc_channel_get_channel_stack(lb_channel_));
1527   GPR_ASSERT(client_channel_elem->filter == &grpc_client_channel_filter);
1528   grpc_client_channel_watch_connectivity_state(
1529       client_channel_elem,
1530       grpc_polling_entity_create_from_pollset_set(interested_parties()),
1531       nullptr, &lb_channel_on_connectivity_changed_, nullptr);
1532 }
1533
1534 //
1535 // code for balancer channel and call
1536 //
1537
1538 void GrpcLb::StartBalancerCallLocked() {
1539   GPR_ASSERT(lb_channel_ != nullptr);
1540   if (shutting_down_) return;
1541   // Init the LB call data.
1542   GPR_ASSERT(lb_calld_ == nullptr);
1543   lb_calld_ = MakeOrphanable<BalancerCallState>(Ref());
1544   if (GRPC_TRACE_FLAG_ENABLED(grpc_lb_glb_trace)) {
1545     gpr_log(GPR_INFO,
1546             "[grpclb %p] Query for backends (lb_channel: %p, lb_calld: %p)",
1547             this, lb_channel_, lb_calld_.get());
1548   }
1549   lb_calld_->StartQuery();
1550 }
1551
1552 void GrpcLb::StartBalancerCallRetryTimerLocked() {
1553   grpc_millis next_try = lb_call_backoff_.NextAttemptTime();
1554   if (GRPC_TRACE_FLAG_ENABLED(grpc_lb_glb_trace)) {
1555     gpr_log(GPR_INFO, "[grpclb %p] Connection to LB server lost...", this);
1556     grpc_millis timeout = next_try - ExecCtx::Get()->Now();
1557     if (timeout > 0) {
1558       gpr_log(GPR_INFO, "[grpclb %p] ... retry_timer_active in %" PRId64 "ms.",
1559               this, timeout);
1560     } else {
1561       gpr_log(GPR_INFO, "[grpclb %p] ... retry_timer_active immediately.",
1562               this);
1563     }
1564   }
1565   // TODO(roth): We currently track this ref manually.  Once the
1566   // ClosureRef API is ready, we should pass the RefCountedPtr<> along
1567   // with the callback.
1568   auto self = Ref(DEBUG_LOCATION, "on_balancer_call_retry_timer");
1569   self.release();
1570   GRPC_CLOSURE_INIT(&lb_on_call_retry_, &GrpcLb::OnBalancerCallRetryTimerLocked,
1571                     this, grpc_combiner_scheduler(combiner()));
1572   retry_timer_callback_pending_ = true;
1573   grpc_timer_init(&lb_call_retry_timer_, next_try, &lb_on_call_retry_);
1574 }
1575
1576 void GrpcLb::OnBalancerCallRetryTimerLocked(void* arg, grpc_error* error) {
1577   GrpcLb* grpclb_policy = static_cast<GrpcLb*>(arg);
1578   grpclb_policy->retry_timer_callback_pending_ = false;
1579   if (!grpclb_policy->shutting_down_ && error == GRPC_ERROR_NONE &&
1580       grpclb_policy->lb_calld_ == nullptr) {
1581     if (GRPC_TRACE_FLAG_ENABLED(grpc_lb_glb_trace)) {
1582       gpr_log(GPR_INFO, "[grpclb %p] Restarting call to LB server",
1583               grpclb_policy);
1584     }
1585     grpclb_policy->StartBalancerCallLocked();
1586   }
1587   grpclb_policy->Unref(DEBUG_LOCATION, "on_balancer_call_retry_timer");
1588 }
1589
1590 //
1591 // code for handling fallback mode
1592 //
1593
1594 void GrpcLb::MaybeEnterFallbackModeAfterStartup() {
1595   // Enter fallback mode if all of the following are true:
1596   // - We are not currently in fallback mode.
1597   // - We are not currently waiting for the initial fallback timeout.
1598   // - We are not currently in contact with the balancer.
1599   // - The child policy is not in state READY.
1600   if (!fallback_mode_ && !fallback_at_startup_checks_pending_ &&
1601       (lb_calld_ == nullptr || !lb_calld_->seen_serverlist()) &&
1602       !child_policy_ready_) {
1603     gpr_log(GPR_INFO,
1604             "[grpclb %p] lost contact with balancer and backends from "
1605             "most recent serverlist; entering fallback mode",
1606             this);
1607     fallback_mode_ = true;
1608     CreateOrUpdateChildPolicyLocked();
1609   }
1610 }
1611
1612 void GrpcLb::OnFallbackTimerLocked(void* arg, grpc_error* error) {
1613   GrpcLb* grpclb_policy = static_cast<GrpcLb*>(arg);
1614   // If we receive a serverlist after the timer fires but before this callback
1615   // actually runs, don't fall back.
1616   if (grpclb_policy->fallback_at_startup_checks_pending_ &&
1617       !grpclb_policy->shutting_down_ && error == GRPC_ERROR_NONE) {
1618     gpr_log(GPR_INFO,
1619             "[grpclb %p] No response from balancer after fallback timeout; "
1620             "entering fallback mode",
1621             grpclb_policy);
1622     grpclb_policy->fallback_at_startup_checks_pending_ = false;
1623     grpclb_policy->CancelBalancerChannelConnectivityWatchLocked();
1624     grpclb_policy->fallback_mode_ = true;
1625     grpclb_policy->CreateOrUpdateChildPolicyLocked();
1626   }
1627   grpclb_policy->Unref(DEBUG_LOCATION, "on_fallback_timer");
1628 }
1629
1630 //
1631 // code for interacting with the child policy
1632 //
1633
1634 grpc_channel_args* GrpcLb::CreateChildPolicyArgsLocked(
1635     bool is_backend_from_grpclb_load_balancer) {
1636   InlinedVector<grpc_arg, 2> args_to_add;
1637   args_to_add.emplace_back(grpc_channel_arg_integer_create(
1638       const_cast<char*>(GRPC_ARG_ADDRESS_IS_BACKEND_FROM_GRPCLB_LOAD_BALANCER),
1639       is_backend_from_grpclb_load_balancer));
1640   if (is_backend_from_grpclb_load_balancer) {
1641     args_to_add.emplace_back(grpc_channel_arg_integer_create(
1642         const_cast<char*>(GRPC_ARG_INHIBIT_HEALTH_CHECKING), 1));
1643   }
1644   return grpc_channel_args_copy_and_add(args_, args_to_add.data(),
1645                                         args_to_add.size());
1646 }
1647
1648 OrphanablePtr<LoadBalancingPolicy> GrpcLb::CreateChildPolicyLocked(
1649     const char* name, const grpc_channel_args* args) {
1650   Helper* helper = New<Helper>(Ref());
1651   LoadBalancingPolicy::Args lb_policy_args;
1652   lb_policy_args.combiner = combiner();
1653   lb_policy_args.args = args;
1654   lb_policy_args.channel_control_helper =
1655       UniquePtr<ChannelControlHelper>(helper);
1656   OrphanablePtr<LoadBalancingPolicy> lb_policy =
1657       LoadBalancingPolicyRegistry::CreateLoadBalancingPolicy(
1658           name, std::move(lb_policy_args));
1659   if (GPR_UNLIKELY(lb_policy == nullptr)) {
1660     gpr_log(GPR_ERROR, "[grpclb %p] Failure creating child policy %s", this,
1661             name);
1662     return nullptr;
1663   }
1664   helper->set_child(lb_policy.get());
1665   if (GRPC_TRACE_FLAG_ENABLED(grpc_lb_glb_trace)) {
1666     gpr_log(GPR_INFO, "[grpclb %p] Created new child policy %s (%p)", this,
1667             name, lb_policy.get());
1668   }
1669   // Add the gRPC LB's interested_parties pollset_set to that of the newly
1670   // created child policy. This will make the child policy progress upon
1671   // activity on gRPC LB, which in turn is tied to the application's call.
1672   grpc_pollset_set_add_pollset_set(lb_policy->interested_parties(),
1673                                    interested_parties());
1674   return lb_policy;
1675 }
1676
1677 void GrpcLb::CreateOrUpdateChildPolicyLocked() {
1678   if (shutting_down_) return;
1679   // Construct update args.
1680   UpdateArgs update_args;
1681   bool is_backend_from_grpclb_load_balancer = false;
1682   if (fallback_mode_) {
1683     // If CreateOrUpdateChildPolicyLocked() is invoked when we haven't
1684     // received any serverlist from the balancer, we use the fallback backends
1685     // returned by the resolver. Note that the fallback backend list may be
1686     // empty, in which case the new round_robin policy will keep the requested
1687     // picks pending.
1688     update_args.addresses = fallback_backend_addresses_;
1689   } else {
1690     update_args.addresses = serverlist_->GetServerAddressList(
1691         lb_calld_ == nullptr ? nullptr : lb_calld_->client_stats());
1692     is_backend_from_grpclb_load_balancer = true;
1693   }
1694   update_args.args =
1695       CreateChildPolicyArgsLocked(is_backend_from_grpclb_load_balancer);
1696   GPR_ASSERT(update_args.args != nullptr);
1697   update_args.config = child_policy_config_;
1698   // If the child policy name changes, we need to create a new child
1699   // policy.  When this happens, we leave child_policy_ as-is and store
1700   // the new child policy in pending_child_policy_.  Once the new child
1701   // policy transitions into state READY, we swap it into child_policy_,
1702   // replacing the original child policy.  So pending_child_policy_ is
1703   // non-null only between when we apply an update that changes the child
1704   // policy name and when the new child reports state READY.
1705   //
1706   // Updates can arrive at any point during this transition.  We always
1707   // apply updates relative to the most recently created child policy,
1708   // even if the most recent one is still in pending_child_policy_.  This
1709   // is true both when applying the updates to an existing child policy
1710   // and when determining whether we need to create a new policy.
1711   //
1712   // As a result of this, there are several cases to consider here:
1713   //
1714   // 1. We have no existing child policy (i.e., we have started up but
1715   //    have not yet received a serverlist from the balancer or gone
1716   //    into fallback mode; in this case, both child_policy_ and
1717   //    pending_child_policy_ are null).  In this case, we create a
1718   //    new child policy and store it in child_policy_.
1719   //
1720   // 2. We have an existing child policy and have no pending child policy
1721   //    from a previous update (i.e., either there has not been a
1722   //    previous update that changed the policy name, or we have already
1723   //    finished swapping in the new policy; in this case, child_policy_
1724   //    is non-null but pending_child_policy_ is null).  In this case:
1725   //    a. If child_policy_->name() equals child_policy_name, then we
1726   //       update the existing child policy.
1727   //    b. If child_policy_->name() does not equal child_policy_name,
1728   //       we create a new policy.  The policy will be stored in
1729   //       pending_child_policy_ and will later be swapped into
1730   //       child_policy_ by the helper when the new child transitions
1731   //       into state READY.
1732   //
1733   // 3. We have an existing child policy and have a pending child policy
1734   //    from a previous update (i.e., a previous update set
1735   //    pending_child_policy_ as per case 2b above and that policy has
1736   //    not yet transitioned into state READY and been swapped into
1737   //    child_policy_; in this case, both child_policy_ and
1738   //    pending_child_policy_ are non-null).  In this case:
1739   //    a. If pending_child_policy_->name() equals child_policy_name,
1740   //       then we update the existing pending child policy.
1741   //    b. If pending_child_policy->name() does not equal
1742   //       child_policy_name, then we create a new policy.  The new
1743   //       policy is stored in pending_child_policy_ (replacing the one
1744   //       that was there before, which will be immediately shut down)
1745   //       and will later be swapped into child_policy_ by the helper
1746   //       when the new child transitions into state READY.
1747   const char* child_policy_name = child_policy_config_ == nullptr
1748                                       ? "round_robin"
1749                                       : child_policy_config_->name();
1750   const bool create_policy =
1751       // case 1
1752       child_policy_ == nullptr ||
1753       // case 2b
1754       (pending_child_policy_ == nullptr &&
1755        strcmp(child_policy_->name(), child_policy_name) != 0) ||
1756       // case 3b
1757       (pending_child_policy_ != nullptr &&
1758        strcmp(pending_child_policy_->name(), child_policy_name) != 0);
1759   LoadBalancingPolicy* policy_to_update = nullptr;
1760   if (create_policy) {
1761     // Cases 1, 2b, and 3b: create a new child policy.
1762     // If child_policy_ is null, we set it (case 1), else we set
1763     // pending_child_policy_ (cases 2b and 3b).
1764     if (GRPC_TRACE_FLAG_ENABLED(grpc_lb_glb_trace)) {
1765       gpr_log(GPR_INFO, "[grpclb %p] Creating new %schild policy %s", this,
1766               child_policy_ == nullptr ? "" : "pending ", child_policy_name);
1767     }
1768     // Swap the policy into place.
1769     auto& lb_policy =
1770         child_policy_ == nullptr ? child_policy_ : pending_child_policy_;
1771     lb_policy = CreateChildPolicyLocked(child_policy_name, update_args.args);
1772     policy_to_update = lb_policy.get();
1773   } else {
1774     // Cases 2a and 3a: update an existing policy.
1775     // If we have a pending child policy, send the update to the pending
1776     // policy (case 3a), else send it to the current policy (case 2a).
1777     policy_to_update = pending_child_policy_ != nullptr
1778                            ? pending_child_policy_.get()
1779                            : child_policy_.get();
1780   }
1781   GPR_ASSERT(policy_to_update != nullptr);
1782   // Update the policy.
1783   if (GRPC_TRACE_FLAG_ENABLED(grpc_lb_glb_trace)) {
1784     gpr_log(GPR_INFO, "[grpclb %p] Updating %schild policy %p", this,
1785             policy_to_update == pending_child_policy_.get() ? "pending " : "",
1786             policy_to_update);
1787   }
1788   policy_to_update->UpdateLocked(std::move(update_args));
1789 }
1790
1791 //
1792 // factory
1793 //
1794
1795 class GrpcLbFactory : public LoadBalancingPolicyFactory {
1796  public:
1797   OrphanablePtr<LoadBalancingPolicy> CreateLoadBalancingPolicy(
1798       LoadBalancingPolicy::Args args) const override {
1799     return OrphanablePtr<LoadBalancingPolicy>(New<GrpcLb>(std::move(args)));
1800   }
1801
1802   const char* name() const override { return kGrpclb; }
1803
1804   RefCountedPtr<LoadBalancingPolicy::Config> ParseLoadBalancingConfig(
1805       const grpc_json* json, grpc_error** error) const override {
1806     GPR_DEBUG_ASSERT(error != nullptr && *error == GRPC_ERROR_NONE);
1807     if (json == nullptr) {
1808       return RefCountedPtr<LoadBalancingPolicy::Config>(
1809           New<ParsedGrpcLbConfig>(nullptr));
1810     }
1811     InlinedVector<grpc_error*, 2> error_list;
1812     RefCountedPtr<LoadBalancingPolicy::Config> child_policy;
1813     for (const grpc_json* field = json->child; field != nullptr;
1814          field = field->next) {
1815       if (field->key == nullptr) continue;
1816       if (strcmp(field->key, "childPolicy") == 0) {
1817         if (child_policy != nullptr) {
1818           error_list.push_back(GRPC_ERROR_CREATE_FROM_STATIC_STRING(
1819               "field:childPolicy error:Duplicate entry"));
1820         }
1821         grpc_error* parse_error = GRPC_ERROR_NONE;
1822         child_policy = LoadBalancingPolicyRegistry::ParseLoadBalancingConfig(
1823             field, &parse_error);
1824         if (parse_error != GRPC_ERROR_NONE) {
1825           error_list.push_back(parse_error);
1826         }
1827       }
1828     }
1829     if (error_list.empty()) {
1830       return RefCountedPtr<LoadBalancingPolicy::Config>(
1831           New<ParsedGrpcLbConfig>(std::move(child_policy)));
1832     } else {
1833       *error = GRPC_ERROR_CREATE_FROM_VECTOR("GrpcLb Parser", &error_list);
1834       return nullptr;
1835     }
1836   }
1837 };
1838
1839 }  // namespace
1840
1841 }  // namespace grpc_core
1842
1843 //
1844 // Plugin registration
1845 //
1846
1847 namespace {
1848
1849 // Only add client_load_reporting filter if the grpclb LB policy is used.
1850 bool maybe_add_client_load_reporting_filter(grpc_channel_stack_builder* builder,
1851                                             void* arg) {
1852   const grpc_channel_args* args =
1853       grpc_channel_stack_builder_get_channel_arguments(builder);
1854   const grpc_arg* channel_arg =
1855       grpc_channel_args_find(args, GRPC_ARG_LB_POLICY_NAME);
1856   if (channel_arg != nullptr && channel_arg->type == GRPC_ARG_STRING &&
1857       strcmp(channel_arg->value.string, "grpclb") == 0) {
1858     // TODO(roth): When we get around to re-attempting
1859     // https://github.com/grpc/grpc/pull/16214, we should try to keep
1860     // this filter at the very top of the subchannel stack, since that
1861     // will minimize the number of metadata elements that the filter
1862     // needs to iterate through to find the ClientStats object.
1863     return grpc_channel_stack_builder_prepend_filter(
1864         builder, (const grpc_channel_filter*)arg, nullptr, nullptr);
1865   }
1866   return true;
1867 }
1868
1869 }  // namespace
1870
1871 void grpc_lb_policy_grpclb_init() {
1872   grpc_core::LoadBalancingPolicyRegistry::Builder::
1873       RegisterLoadBalancingPolicyFactory(
1874           grpc_core::UniquePtr<grpc_core::LoadBalancingPolicyFactory>(
1875               grpc_core::New<grpc_core::GrpcLbFactory>()));
1876   grpc_channel_init_register_stage(GRPC_CLIENT_SUBCHANNEL,
1877                                    GRPC_CHANNEL_INIT_BUILTIN_PRIORITY,
1878                                    maybe_add_client_load_reporting_filter,
1879                                    (void*)&grpc_client_load_reporting_filter);
1880 }
1881
1882 void grpc_lb_policy_grpclb_shutdown() {}