Built motion from commit 6a09e18b.|2.6.11
[motion2.git] / legacy-libs / grpc-cloned / deps / grpc / src / core / ext / filters / client_channel / lb_policy / subchannel_list.h
1 /*
2  *
3  * Copyright 2015 gRPC authors.
4  *
5  * Licensed under the Apache License, Version 2.0 (the "License");
6  * you may not use this file except in compliance with the License.
7  * You may obtain a copy of the License at
8  *
9  *     http://www.apache.org/licenses/LICENSE-2.0
10  *
11  * Unless required by applicable law or agreed to in writing, software
12  * distributed under the License is distributed on an "AS IS" BASIS,
13  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14  * See the License for the specific language governing permissions and
15  * limitations under the License.
16  *
17  */
18
19 #ifndef GRPC_CORE_EXT_FILTERS_CLIENT_CHANNEL_LB_POLICY_SUBCHANNEL_LIST_H
20 #define GRPC_CORE_EXT_FILTERS_CLIENT_CHANNEL_LB_POLICY_SUBCHANNEL_LIST_H
21
22 #include <grpc/support/port_platform.h>
23
24 #include <string.h>
25
26 #include <grpc/support/alloc.h>
27
28 #include "src/core/ext/filters/client_channel/lb_policy_registry.h"
29 #include "src/core/ext/filters/client_channel/server_address.h"
30 // TODO(roth): Should not need the include of subchannel.h here, since
31 // that implementation should be hidden from the LB policy API.
32 #include "src/core/ext/filters/client_channel/subchannel.h"
33 #include "src/core/ext/filters/client_channel/subchannel_interface.h"
34 #include "src/core/lib/channel/channel_args.h"
35 #include "src/core/lib/debug/trace.h"
36 #include "src/core/lib/gprpp/abstract.h"
37 #include "src/core/lib/gprpp/inlined_vector.h"
38 #include "src/core/lib/gprpp/orphanable.h"
39 #include "src/core/lib/gprpp/ref_counted.h"
40 #include "src/core/lib/gprpp/ref_counted_ptr.h"
41 #include "src/core/lib/iomgr/closure.h"
42 #include "src/core/lib/iomgr/sockaddr_utils.h"
43 #include "src/core/lib/transport/connectivity_state.h"
44
45 // Code for maintaining a list of subchannels within an LB policy.
46 //
47 // To use this, callers must create their own subclasses, like so:
48 /*
49
50 class MySubchannelList;  // Forward declaration.
51
52 class MySubchannelData
53     : public SubchannelData<MySubchannelList, MySubchannelData> {
54  public:
55   void ProcessConnectivityChangeLocked(
56       grpc_connectivity_state connectivity_state) override {
57     // ...code to handle connectivity changes...
58   }
59 };
60
61 class MySubchannelList
62     : public SubchannelList<MySubchannelList, MySubchannelData> {
63 };
64
65 */
66 // All methods will be called from within the client_channel combiner.
67
68 namespace grpc_core {
69
70 // Forward declaration.
71 template <typename SubchannelListType, typename SubchannelDataType>
72 class SubchannelList;
73
74 // Stores data for a particular subchannel in a subchannel list.
75 // Callers must create a subclass that implements the
76 // ProcessConnectivityChangeLocked() method.
77 template <typename SubchannelListType, typename SubchannelDataType>
78 class SubchannelData {
79  public:
80   // Returns a pointer to the subchannel list containing this object.
81   SubchannelListType* subchannel_list() const {
82     return static_cast<SubchannelListType*>(subchannel_list_);
83   }
84
85   // Returns the index into the subchannel list of this object.
86   size_t Index() const {
87     return static_cast<size_t>(static_cast<const SubchannelDataType*>(this) -
88                                subchannel_list_->subchannel(0));
89   }
90
91   // Returns a pointer to the subchannel.
92   SubchannelInterface* subchannel() const { return subchannel_.get(); }
93
94   // Synchronously checks the subchannel's connectivity state.
95   // Must not be called while there is a connectivity notification
96   // pending (i.e., between calling StartConnectivityWatchLocked() and
97   // calling CancelConnectivityWatchLocked()).
98   grpc_connectivity_state CheckConnectivityStateLocked() {
99     GPR_ASSERT(pending_watcher_ == nullptr);
100     connectivity_state_ = subchannel_->CheckConnectivityState();
101     return connectivity_state_;
102   }
103
104   // Resets the connection backoff.
105   // TODO(roth): This method should go away when we move the backoff
106   // code out of the subchannel and into the LB policies.
107   void ResetBackoffLocked();
108
109   // Starts watching the connectivity state of the subchannel.
110   // ProcessConnectivityChangeLocked() will be called whenever the
111   // connectivity state changes.
112   void StartConnectivityWatchLocked();
113
114   // Cancels watching the connectivity state of the subchannel.
115   void CancelConnectivityWatchLocked(const char* reason);
116
117   // Cancels any pending connectivity watch and unrefs the subchannel.
118   void ShutdownLocked();
119
120   GRPC_ABSTRACT_BASE_CLASS
121
122  protected:
123   SubchannelData(
124       SubchannelList<SubchannelListType, SubchannelDataType>* subchannel_list,
125       const ServerAddress& address,
126       RefCountedPtr<SubchannelInterface> subchannel);
127
128   virtual ~SubchannelData();
129
130   // After StartConnectivityWatchLocked() is called, this method will be
131   // invoked whenever the subchannel's connectivity state changes.
132   // To stop watching, use CancelConnectivityWatchLocked().
133   virtual void ProcessConnectivityChangeLocked(
134       grpc_connectivity_state connectivity_state) GRPC_ABSTRACT;
135
136  private:
137   // Watcher for subchannel connectivity state.
138   class Watcher
139       : public SubchannelInterface::ConnectivityStateWatcherInterface {
140    public:
141     Watcher(
142         SubchannelData<SubchannelListType, SubchannelDataType>* subchannel_data,
143         RefCountedPtr<SubchannelListType> subchannel_list)
144         : subchannel_data_(subchannel_data),
145           subchannel_list_(std::move(subchannel_list)) {}
146
147     ~Watcher() { subchannel_list_.reset(DEBUG_LOCATION, "Watcher dtor"); }
148
149     void OnConnectivityStateChange(grpc_connectivity_state new_state) override;
150
151     grpc_pollset_set* interested_parties() override {
152       return subchannel_list_->policy()->interested_parties();
153     }
154
155    private:
156     SubchannelData<SubchannelListType, SubchannelDataType>* subchannel_data_;
157     RefCountedPtr<SubchannelListType> subchannel_list_;
158   };
159
160   // Unrefs the subchannel.
161   void UnrefSubchannelLocked(const char* reason);
162
163   // Backpointer to owning subchannel list.  Not owned.
164   SubchannelList<SubchannelListType, SubchannelDataType>* subchannel_list_;
165   // The subchannel.
166   RefCountedPtr<SubchannelInterface> subchannel_;
167   // Will be non-null when the subchannel's state is being watched.
168   SubchannelInterface::ConnectivityStateWatcherInterface* pending_watcher_ =
169       nullptr;
170   // Data updated by the watcher.
171   grpc_connectivity_state connectivity_state_;
172 };
173
174 // A list of subchannels.
175 template <typename SubchannelListType, typename SubchannelDataType>
176 class SubchannelList : public InternallyRefCounted<SubchannelListType> {
177  public:
178   typedef InlinedVector<SubchannelDataType, 10> SubchannelVector;
179
180   // The number of subchannels in the list.
181   size_t num_subchannels() const { return subchannels_.size(); }
182
183   // The data for the subchannel at a particular index.
184   SubchannelDataType* subchannel(size_t index) { return &subchannels_[index]; }
185
186   // Returns true if the subchannel list is shutting down.
187   bool shutting_down() const { return shutting_down_; }
188
189   // Accessors.
190   LoadBalancingPolicy* policy() const { return policy_; }
191   TraceFlag* tracer() const { return tracer_; }
192
193   // Resets connection backoff of all subchannels.
194   // TODO(roth): We will probably need to rethink this as part of moving
195   // the backoff code out of subchannels and into LB policies.
196   void ResetBackoffLocked();
197
198   void Orphan() override {
199     ShutdownLocked();
200     InternallyRefCounted<SubchannelListType>::Unref(DEBUG_LOCATION, "shutdown");
201   }
202
203   GRPC_ABSTRACT_BASE_CLASS
204
205  protected:
206   SubchannelList(LoadBalancingPolicy* policy, TraceFlag* tracer,
207                  const ServerAddressList& addresses,
208                  LoadBalancingPolicy::ChannelControlHelper* helper,
209                  const grpc_channel_args& args);
210
211   virtual ~SubchannelList();
212
213  private:
214   // So New() can call our private ctor.
215   template <typename T, typename... Args>
216   friend T* New(Args&&... args);
217
218   // For accessing Ref() and Unref().
219   friend class SubchannelData<SubchannelListType, SubchannelDataType>;
220
221   void ShutdownLocked();
222
223   // Backpointer to owning policy.
224   LoadBalancingPolicy* policy_;
225
226   TraceFlag* tracer_;
227
228   // The list of subchannels.
229   SubchannelVector subchannels_;
230
231   // Is this list shutting down? This may be true due to the shutdown of the
232   // policy itself or because a newer update has arrived while this one hadn't
233   // finished processing.
234   bool shutting_down_ = false;
235 };
236
237 //
238 // implementation -- no user-servicable parts below
239 //
240
241 //
242 // SubchannelData::Watcher
243 //
244
245 template <typename SubchannelListType, typename SubchannelDataType>
246 void SubchannelData<SubchannelListType, SubchannelDataType>::Watcher::
247     OnConnectivityStateChange(grpc_connectivity_state new_state) {
248   if (GRPC_TRACE_FLAG_ENABLED(*subchannel_list_->tracer())) {
249     gpr_log(GPR_INFO,
250             "[%s %p] subchannel list %p index %" PRIuPTR " of %" PRIuPTR
251             " (subchannel %p): connectivity changed: state=%s, "
252             "shutting_down=%d, pending_watcher=%p",
253             subchannel_list_->tracer()->name(), subchannel_list_->policy(),
254             subchannel_list_.get(), subchannel_data_->Index(),
255             subchannel_list_->num_subchannels(),
256             subchannel_data_->subchannel_.get(),
257             grpc_connectivity_state_name(new_state),
258             subchannel_list_->shutting_down(),
259             subchannel_data_->pending_watcher_);
260   }
261   if (!subchannel_list_->shutting_down() &&
262       subchannel_data_->pending_watcher_ != nullptr) {
263     subchannel_data_->connectivity_state_ = new_state;
264     // Call the subclass's ProcessConnectivityChangeLocked() method.
265     subchannel_data_->ProcessConnectivityChangeLocked(new_state);
266   }
267 }
268
269 //
270 // SubchannelData
271 //
272
273 template <typename SubchannelListType, typename SubchannelDataType>
274 SubchannelData<SubchannelListType, SubchannelDataType>::SubchannelData(
275     SubchannelList<SubchannelListType, SubchannelDataType>* subchannel_list,
276     const ServerAddress& address, RefCountedPtr<SubchannelInterface> subchannel)
277     : subchannel_list_(subchannel_list),
278       subchannel_(std::move(subchannel)),
279       // We assume that the current state is IDLE.  If not, we'll get a
280       // callback telling us that.
281       connectivity_state_(GRPC_CHANNEL_IDLE) {}
282
283 template <typename SubchannelListType, typename SubchannelDataType>
284 SubchannelData<SubchannelListType, SubchannelDataType>::~SubchannelData() {
285   GPR_ASSERT(subchannel_ == nullptr);
286 }
287
288 template <typename SubchannelListType, typename SubchannelDataType>
289 void SubchannelData<SubchannelListType, SubchannelDataType>::
290     UnrefSubchannelLocked(const char* reason) {
291   if (subchannel_ != nullptr) {
292     if (GRPC_TRACE_FLAG_ENABLED(*subchannel_list_->tracer())) {
293       gpr_log(GPR_INFO,
294               "[%s %p] subchannel list %p index %" PRIuPTR " of %" PRIuPTR
295               " (subchannel %p): unreffing subchannel",
296               subchannel_list_->tracer()->name(), subchannel_list_->policy(),
297               subchannel_list_, Index(), subchannel_list_->num_subchannels(),
298               subchannel_.get());
299     }
300     subchannel_.reset();
301   }
302 }
303
304 template <typename SubchannelListType, typename SubchannelDataType>
305 void SubchannelData<SubchannelListType,
306                     SubchannelDataType>::ResetBackoffLocked() {
307   if (subchannel_ != nullptr) {
308     subchannel_->ResetBackoff();
309   }
310 }
311
312 template <typename SubchannelListType, typename SubchannelDataType>
313 void SubchannelData<SubchannelListType,
314                     SubchannelDataType>::StartConnectivityWatchLocked() {
315   if (GRPC_TRACE_FLAG_ENABLED(*subchannel_list_->tracer())) {
316     gpr_log(GPR_INFO,
317             "[%s %p] subchannel list %p index %" PRIuPTR " of %" PRIuPTR
318             " (subchannel %p): starting watch (from %s)",
319             subchannel_list_->tracer()->name(), subchannel_list_->policy(),
320             subchannel_list_, Index(), subchannel_list_->num_subchannels(),
321             subchannel_.get(),
322             grpc_connectivity_state_name(connectivity_state_));
323   }
324   GPR_ASSERT(pending_watcher_ == nullptr);
325   pending_watcher_ =
326       New<Watcher>(this, subchannel_list()->Ref(DEBUG_LOCATION, "Watcher"));
327   subchannel_->WatchConnectivityState(
328       connectivity_state_,
329       UniquePtr<SubchannelInterface::ConnectivityStateWatcherInterface>(
330           pending_watcher_));
331 }
332
333 template <typename SubchannelListType, typename SubchannelDataType>
334 void SubchannelData<SubchannelListType, SubchannelDataType>::
335     CancelConnectivityWatchLocked(const char* reason) {
336   if (GRPC_TRACE_FLAG_ENABLED(*subchannel_list_->tracer())) {
337     gpr_log(GPR_INFO,
338             "[%s %p] subchannel list %p index %" PRIuPTR " of %" PRIuPTR
339             " (subchannel %p): canceling connectivity watch (%s)",
340             subchannel_list_->tracer()->name(), subchannel_list_->policy(),
341             subchannel_list_, Index(), subchannel_list_->num_subchannels(),
342             subchannel_.get(), reason);
343   }
344   if (pending_watcher_ != nullptr) {
345     subchannel_->CancelConnectivityStateWatch(pending_watcher_);
346     pending_watcher_ = nullptr;
347   }
348 }
349
350 template <typename SubchannelListType, typename SubchannelDataType>
351 void SubchannelData<SubchannelListType, SubchannelDataType>::ShutdownLocked() {
352   if (pending_watcher_ != nullptr) CancelConnectivityWatchLocked("shutdown");
353   UnrefSubchannelLocked("shutdown");
354 }
355
356 //
357 // SubchannelList
358 //
359
360 template <typename SubchannelListType, typename SubchannelDataType>
361 SubchannelList<SubchannelListType, SubchannelDataType>::SubchannelList(
362     LoadBalancingPolicy* policy, TraceFlag* tracer,
363     const ServerAddressList& addresses,
364     LoadBalancingPolicy::ChannelControlHelper* helper,
365     const grpc_channel_args& args)
366     : InternallyRefCounted<SubchannelListType>(tracer),
367       policy_(policy),
368       tracer_(tracer) {
369   if (GRPC_TRACE_FLAG_ENABLED(*tracer_)) {
370     gpr_log(GPR_INFO,
371             "[%s %p] Creating subchannel list %p for %" PRIuPTR " subchannels",
372             tracer_->name(), policy, this, addresses.size());
373   }
374   subchannels_.reserve(addresses.size());
375   // We need to remove the LB addresses in order to be able to compare the
376   // subchannel keys of subchannels from a different batch of addresses.
377   // We remove the service config, since it will be passed into the
378   // subchannel via call context.
379   static const char* keys_to_remove[] = {GRPC_ARG_SUBCHANNEL_ADDRESS,
380                                          GRPC_ARG_SERVICE_CONFIG};
381   // Create a subchannel for each address.
382   for (size_t i = 0; i < addresses.size(); i++) {
383     // TODO(roth): we should ideally hide this from the LB policy code. In
384     // principle, if we're dealing with this special case in the client_channel
385     // code for selecting grpclb, then we should also strip out these addresses
386     // there if we're not using grpclb.
387     if (addresses[i].IsBalancer()) {
388       continue;
389     }
390     InlinedVector<grpc_arg, 3> args_to_add;
391     const size_t subchannel_address_arg_index = args_to_add.size();
392     args_to_add.emplace_back(
393         Subchannel::CreateSubchannelAddressArg(&addresses[i].address()));
394     if (addresses[i].args() != nullptr) {
395       for (size_t j = 0; j < addresses[i].args()->num_args; ++j) {
396         args_to_add.emplace_back(addresses[i].args()->args[j]);
397       }
398     }
399     grpc_channel_args* new_args = grpc_channel_args_copy_and_add_and_remove(
400         &args, keys_to_remove, GPR_ARRAY_SIZE(keys_to_remove),
401         args_to_add.data(), args_to_add.size());
402     gpr_free(args_to_add[subchannel_address_arg_index].value.string);
403     RefCountedPtr<SubchannelInterface> subchannel =
404         helper->CreateSubchannel(*new_args);
405     grpc_channel_args_destroy(new_args);
406     if (subchannel == nullptr) {
407       // Subchannel could not be created.
408       if (GRPC_TRACE_FLAG_ENABLED(*tracer_)) {
409         char* address_uri = grpc_sockaddr_to_uri(&addresses[i].address());
410         gpr_log(GPR_INFO,
411                 "[%s %p] could not create subchannel for address uri %s, "
412                 "ignoring",
413                 tracer_->name(), policy_, address_uri);
414         gpr_free(address_uri);
415       }
416       continue;
417     }
418     if (GRPC_TRACE_FLAG_ENABLED(*tracer_)) {
419       char* address_uri = grpc_sockaddr_to_uri(&addresses[i].address());
420       gpr_log(GPR_INFO,
421               "[%s %p] subchannel list %p index %" PRIuPTR
422               ": Created subchannel %p for address uri %s",
423               tracer_->name(), policy_, this, subchannels_.size(),
424               subchannel.get(), address_uri);
425       gpr_free(address_uri);
426     }
427     subchannels_.emplace_back(this, addresses[i], std::move(subchannel));
428   }
429 }
430
431 template <typename SubchannelListType, typename SubchannelDataType>
432 SubchannelList<SubchannelListType, SubchannelDataType>::~SubchannelList() {
433   if (GRPC_TRACE_FLAG_ENABLED(*tracer_)) {
434     gpr_log(GPR_INFO, "[%s %p] Destroying subchannel_list %p", tracer_->name(),
435             policy_, this);
436   }
437 }
438
439 template <typename SubchannelListType, typename SubchannelDataType>
440 void SubchannelList<SubchannelListType, SubchannelDataType>::ShutdownLocked() {
441   if (GRPC_TRACE_FLAG_ENABLED(*tracer_)) {
442     gpr_log(GPR_INFO, "[%s %p] Shutting down subchannel_list %p",
443             tracer_->name(), policy_, this);
444   }
445   GPR_ASSERT(!shutting_down_);
446   shutting_down_ = true;
447   for (size_t i = 0; i < subchannels_.size(); i++) {
448     SubchannelDataType* sd = &subchannels_[i];
449     sd->ShutdownLocked();
450   }
451 }
452
453 template <typename SubchannelListType, typename SubchannelDataType>
454 void SubchannelList<SubchannelListType,
455                     SubchannelDataType>::ResetBackoffLocked() {
456   for (size_t i = 0; i < subchannels_.size(); i++) {
457     SubchannelDataType* sd = &subchannels_[i];
458     sd->ResetBackoffLocked();
459   }
460 }
461
462 }  // namespace grpc_core
463
464 #endif /* GRPC_CORE_EXT_FILTERS_CLIENT_CHANNEL_LB_POLICY_SUBCHANNEL_LIST_H */