3 * Copyright 2015 gRPC authors.
5 * Licensed under the Apache License, Version 2.0 (the "License");
6 * you may not use this file except in compliance with the License.
7 * You may obtain a copy of the License at
9 * http://www.apache.org/licenses/LICENSE-2.0
11 * Unless required by applicable law or agreed to in writing, software
12 * distributed under the License is distributed on an "AS IS" BASIS,
13 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14 * See the License for the specific language governing permissions and
15 * limitations under the License.
19 #ifndef GRPC_CORE_EXT_FILTERS_CLIENT_CHANNEL_LB_POLICY_SUBCHANNEL_LIST_H
20 #define GRPC_CORE_EXT_FILTERS_CLIENT_CHANNEL_LB_POLICY_SUBCHANNEL_LIST_H
22 #include <grpc/support/port_platform.h>
26 #include <grpc/support/alloc.h>
28 #include "src/core/ext/filters/client_channel/lb_policy_registry.h"
29 #include "src/core/ext/filters/client_channel/server_address.h"
30 // TODO(roth): Should not need the include of subchannel.h here, since
31 // that implementation should be hidden from the LB policy API.
32 #include "src/core/ext/filters/client_channel/subchannel.h"
33 #include "src/core/ext/filters/client_channel/subchannel_interface.h"
34 #include "src/core/lib/channel/channel_args.h"
35 #include "src/core/lib/debug/trace.h"
36 #include "src/core/lib/gprpp/abstract.h"
37 #include "src/core/lib/gprpp/inlined_vector.h"
38 #include "src/core/lib/gprpp/orphanable.h"
39 #include "src/core/lib/gprpp/ref_counted.h"
40 #include "src/core/lib/gprpp/ref_counted_ptr.h"
41 #include "src/core/lib/iomgr/closure.h"
42 #include "src/core/lib/iomgr/sockaddr_utils.h"
43 #include "src/core/lib/transport/connectivity_state.h"
45 // Code for maintaining a list of subchannels within an LB policy.
47 // To use this, callers must create their own subclasses, like so:
50 class MySubchannelList; // Forward declaration.
52 class MySubchannelData
53 : public SubchannelData<MySubchannelList, MySubchannelData> {
55 void ProcessConnectivityChangeLocked(
56 grpc_connectivity_state connectivity_state) override {
57 // ...code to handle connectivity changes...
61 class MySubchannelList
62 : public SubchannelList<MySubchannelList, MySubchannelData> {
66 // All methods will be called from within the client_channel combiner.
70 // Forward declaration.
71 template <typename SubchannelListType, typename SubchannelDataType>
74 // Stores data for a particular subchannel in a subchannel list.
75 // Callers must create a subclass that implements the
76 // ProcessConnectivityChangeLocked() method.
77 template <typename SubchannelListType, typename SubchannelDataType>
78 class SubchannelData {
80 // Returns a pointer to the subchannel list containing this object.
81 SubchannelListType* subchannel_list() const {
82 return static_cast<SubchannelListType*>(subchannel_list_);
85 // Returns the index into the subchannel list of this object.
86 size_t Index() const {
87 return static_cast<size_t>(static_cast<const SubchannelDataType*>(this) -
88 subchannel_list_->subchannel(0));
91 // Returns a pointer to the subchannel.
92 SubchannelInterface* subchannel() const { return subchannel_.get(); }
94 // Synchronously checks the subchannel's connectivity state.
95 // Must not be called while there is a connectivity notification
96 // pending (i.e., between calling StartConnectivityWatchLocked() and
97 // calling CancelConnectivityWatchLocked()).
98 grpc_connectivity_state CheckConnectivityStateLocked() {
99 GPR_ASSERT(pending_watcher_ == nullptr);
100 connectivity_state_ = subchannel_->CheckConnectivityState();
101 return connectivity_state_;
104 // Resets the connection backoff.
105 // TODO(roth): This method should go away when we move the backoff
106 // code out of the subchannel and into the LB policies.
107 void ResetBackoffLocked();
109 // Starts watching the connectivity state of the subchannel.
110 // ProcessConnectivityChangeLocked() will be called whenever the
111 // connectivity state changes.
112 void StartConnectivityWatchLocked();
114 // Cancels watching the connectivity state of the subchannel.
115 void CancelConnectivityWatchLocked(const char* reason);
117 // Cancels any pending connectivity watch and unrefs the subchannel.
118 void ShutdownLocked();
120 GRPC_ABSTRACT_BASE_CLASS
124 SubchannelList<SubchannelListType, SubchannelDataType>* subchannel_list,
125 const ServerAddress& address,
126 RefCountedPtr<SubchannelInterface> subchannel);
128 virtual ~SubchannelData();
130 // After StartConnectivityWatchLocked() is called, this method will be
131 // invoked whenever the subchannel's connectivity state changes.
132 // To stop watching, use CancelConnectivityWatchLocked().
133 virtual void ProcessConnectivityChangeLocked(
134 grpc_connectivity_state connectivity_state) GRPC_ABSTRACT;
137 // Watcher for subchannel connectivity state.
139 : public SubchannelInterface::ConnectivityStateWatcherInterface {
142 SubchannelData<SubchannelListType, SubchannelDataType>* subchannel_data,
143 RefCountedPtr<SubchannelListType> subchannel_list)
144 : subchannel_data_(subchannel_data),
145 subchannel_list_(std::move(subchannel_list)) {}
147 ~Watcher() { subchannel_list_.reset(DEBUG_LOCATION, "Watcher dtor"); }
149 void OnConnectivityStateChange(grpc_connectivity_state new_state) override;
151 grpc_pollset_set* interested_parties() override {
152 return subchannel_list_->policy()->interested_parties();
156 SubchannelData<SubchannelListType, SubchannelDataType>* subchannel_data_;
157 RefCountedPtr<SubchannelListType> subchannel_list_;
160 // Unrefs the subchannel.
161 void UnrefSubchannelLocked(const char* reason);
163 // Backpointer to owning subchannel list. Not owned.
164 SubchannelList<SubchannelListType, SubchannelDataType>* subchannel_list_;
166 RefCountedPtr<SubchannelInterface> subchannel_;
167 // Will be non-null when the subchannel's state is being watched.
168 SubchannelInterface::ConnectivityStateWatcherInterface* pending_watcher_ =
170 // Data updated by the watcher.
171 grpc_connectivity_state connectivity_state_;
174 // A list of subchannels.
175 template <typename SubchannelListType, typename SubchannelDataType>
176 class SubchannelList : public InternallyRefCounted<SubchannelListType> {
178 typedef InlinedVector<SubchannelDataType, 10> SubchannelVector;
180 // The number of subchannels in the list.
181 size_t num_subchannels() const { return subchannels_.size(); }
183 // The data for the subchannel at a particular index.
184 SubchannelDataType* subchannel(size_t index) { return &subchannels_[index]; }
186 // Returns true if the subchannel list is shutting down.
187 bool shutting_down() const { return shutting_down_; }
190 LoadBalancingPolicy* policy() const { return policy_; }
191 TraceFlag* tracer() const { return tracer_; }
193 // Resets connection backoff of all subchannels.
194 // TODO(roth): We will probably need to rethink this as part of moving
195 // the backoff code out of subchannels and into LB policies.
196 void ResetBackoffLocked();
198 void Orphan() override {
200 InternallyRefCounted<SubchannelListType>::Unref(DEBUG_LOCATION, "shutdown");
203 GRPC_ABSTRACT_BASE_CLASS
206 SubchannelList(LoadBalancingPolicy* policy, TraceFlag* tracer,
207 const ServerAddressList& addresses,
208 LoadBalancingPolicy::ChannelControlHelper* helper,
209 const grpc_channel_args& args);
211 virtual ~SubchannelList();
214 // So New() can call our private ctor.
215 template <typename T, typename... Args>
216 friend T* New(Args&&... args);
218 // For accessing Ref() and Unref().
219 friend class SubchannelData<SubchannelListType, SubchannelDataType>;
221 void ShutdownLocked();
223 // Backpointer to owning policy.
224 LoadBalancingPolicy* policy_;
228 // The list of subchannels.
229 SubchannelVector subchannels_;
231 // Is this list shutting down? This may be true due to the shutdown of the
232 // policy itself or because a newer update has arrived while this one hadn't
233 // finished processing.
234 bool shutting_down_ = false;
238 // implementation -- no user-servicable parts below
242 // SubchannelData::Watcher
245 template <typename SubchannelListType, typename SubchannelDataType>
246 void SubchannelData<SubchannelListType, SubchannelDataType>::Watcher::
247 OnConnectivityStateChange(grpc_connectivity_state new_state) {
248 if (GRPC_TRACE_FLAG_ENABLED(*subchannel_list_->tracer())) {
250 "[%s %p] subchannel list %p index %" PRIuPTR " of %" PRIuPTR
251 " (subchannel %p): connectivity changed: state=%s, "
252 "shutting_down=%d, pending_watcher=%p",
253 subchannel_list_->tracer()->name(), subchannel_list_->policy(),
254 subchannel_list_.get(), subchannel_data_->Index(),
255 subchannel_list_->num_subchannels(),
256 subchannel_data_->subchannel_.get(),
257 grpc_connectivity_state_name(new_state),
258 subchannel_list_->shutting_down(),
259 subchannel_data_->pending_watcher_);
261 if (!subchannel_list_->shutting_down() &&
262 subchannel_data_->pending_watcher_ != nullptr) {
263 subchannel_data_->connectivity_state_ = new_state;
264 // Call the subclass's ProcessConnectivityChangeLocked() method.
265 subchannel_data_->ProcessConnectivityChangeLocked(new_state);
273 template <typename SubchannelListType, typename SubchannelDataType>
274 SubchannelData<SubchannelListType, SubchannelDataType>::SubchannelData(
275 SubchannelList<SubchannelListType, SubchannelDataType>* subchannel_list,
276 const ServerAddress& address, RefCountedPtr<SubchannelInterface> subchannel)
277 : subchannel_list_(subchannel_list),
278 subchannel_(std::move(subchannel)),
279 // We assume that the current state is IDLE. If not, we'll get a
280 // callback telling us that.
281 connectivity_state_(GRPC_CHANNEL_IDLE) {}
283 template <typename SubchannelListType, typename SubchannelDataType>
284 SubchannelData<SubchannelListType, SubchannelDataType>::~SubchannelData() {
285 GPR_ASSERT(subchannel_ == nullptr);
288 template <typename SubchannelListType, typename SubchannelDataType>
289 void SubchannelData<SubchannelListType, SubchannelDataType>::
290 UnrefSubchannelLocked(const char* reason) {
291 if (subchannel_ != nullptr) {
292 if (GRPC_TRACE_FLAG_ENABLED(*subchannel_list_->tracer())) {
294 "[%s %p] subchannel list %p index %" PRIuPTR " of %" PRIuPTR
295 " (subchannel %p): unreffing subchannel",
296 subchannel_list_->tracer()->name(), subchannel_list_->policy(),
297 subchannel_list_, Index(), subchannel_list_->num_subchannels(),
304 template <typename SubchannelListType, typename SubchannelDataType>
305 void SubchannelData<SubchannelListType,
306 SubchannelDataType>::ResetBackoffLocked() {
307 if (subchannel_ != nullptr) {
308 subchannel_->ResetBackoff();
312 template <typename SubchannelListType, typename SubchannelDataType>
313 void SubchannelData<SubchannelListType,
314 SubchannelDataType>::StartConnectivityWatchLocked() {
315 if (GRPC_TRACE_FLAG_ENABLED(*subchannel_list_->tracer())) {
317 "[%s %p] subchannel list %p index %" PRIuPTR " of %" PRIuPTR
318 " (subchannel %p): starting watch (from %s)",
319 subchannel_list_->tracer()->name(), subchannel_list_->policy(),
320 subchannel_list_, Index(), subchannel_list_->num_subchannels(),
322 grpc_connectivity_state_name(connectivity_state_));
324 GPR_ASSERT(pending_watcher_ == nullptr);
326 New<Watcher>(this, subchannel_list()->Ref(DEBUG_LOCATION, "Watcher"));
327 subchannel_->WatchConnectivityState(
329 UniquePtr<SubchannelInterface::ConnectivityStateWatcherInterface>(
333 template <typename SubchannelListType, typename SubchannelDataType>
334 void SubchannelData<SubchannelListType, SubchannelDataType>::
335 CancelConnectivityWatchLocked(const char* reason) {
336 if (GRPC_TRACE_FLAG_ENABLED(*subchannel_list_->tracer())) {
338 "[%s %p] subchannel list %p index %" PRIuPTR " of %" PRIuPTR
339 " (subchannel %p): canceling connectivity watch (%s)",
340 subchannel_list_->tracer()->name(), subchannel_list_->policy(),
341 subchannel_list_, Index(), subchannel_list_->num_subchannels(),
342 subchannel_.get(), reason);
344 if (pending_watcher_ != nullptr) {
345 subchannel_->CancelConnectivityStateWatch(pending_watcher_);
346 pending_watcher_ = nullptr;
350 template <typename SubchannelListType, typename SubchannelDataType>
351 void SubchannelData<SubchannelListType, SubchannelDataType>::ShutdownLocked() {
352 if (pending_watcher_ != nullptr) CancelConnectivityWatchLocked("shutdown");
353 UnrefSubchannelLocked("shutdown");
360 template <typename SubchannelListType, typename SubchannelDataType>
361 SubchannelList<SubchannelListType, SubchannelDataType>::SubchannelList(
362 LoadBalancingPolicy* policy, TraceFlag* tracer,
363 const ServerAddressList& addresses,
364 LoadBalancingPolicy::ChannelControlHelper* helper,
365 const grpc_channel_args& args)
366 : InternallyRefCounted<SubchannelListType>(tracer),
369 if (GRPC_TRACE_FLAG_ENABLED(*tracer_)) {
371 "[%s %p] Creating subchannel list %p for %" PRIuPTR " subchannels",
372 tracer_->name(), policy, this, addresses.size());
374 subchannels_.reserve(addresses.size());
375 // We need to remove the LB addresses in order to be able to compare the
376 // subchannel keys of subchannels from a different batch of addresses.
377 // We remove the service config, since it will be passed into the
378 // subchannel via call context.
379 static const char* keys_to_remove[] = {GRPC_ARG_SUBCHANNEL_ADDRESS,
380 GRPC_ARG_SERVICE_CONFIG};
381 // Create a subchannel for each address.
382 for (size_t i = 0; i < addresses.size(); i++) {
383 // TODO(roth): we should ideally hide this from the LB policy code. In
384 // principle, if we're dealing with this special case in the client_channel
385 // code for selecting grpclb, then we should also strip out these addresses
386 // there if we're not using grpclb.
387 if (addresses[i].IsBalancer()) {
390 InlinedVector<grpc_arg, 3> args_to_add;
391 const size_t subchannel_address_arg_index = args_to_add.size();
392 args_to_add.emplace_back(
393 Subchannel::CreateSubchannelAddressArg(&addresses[i].address()));
394 if (addresses[i].args() != nullptr) {
395 for (size_t j = 0; j < addresses[i].args()->num_args; ++j) {
396 args_to_add.emplace_back(addresses[i].args()->args[j]);
399 grpc_channel_args* new_args = grpc_channel_args_copy_and_add_and_remove(
400 &args, keys_to_remove, GPR_ARRAY_SIZE(keys_to_remove),
401 args_to_add.data(), args_to_add.size());
402 gpr_free(args_to_add[subchannel_address_arg_index].value.string);
403 RefCountedPtr<SubchannelInterface> subchannel =
404 helper->CreateSubchannel(*new_args);
405 grpc_channel_args_destroy(new_args);
406 if (subchannel == nullptr) {
407 // Subchannel could not be created.
408 if (GRPC_TRACE_FLAG_ENABLED(*tracer_)) {
409 char* address_uri = grpc_sockaddr_to_uri(&addresses[i].address());
411 "[%s %p] could not create subchannel for address uri %s, "
413 tracer_->name(), policy_, address_uri);
414 gpr_free(address_uri);
418 if (GRPC_TRACE_FLAG_ENABLED(*tracer_)) {
419 char* address_uri = grpc_sockaddr_to_uri(&addresses[i].address());
421 "[%s %p] subchannel list %p index %" PRIuPTR
422 ": Created subchannel %p for address uri %s",
423 tracer_->name(), policy_, this, subchannels_.size(),
424 subchannel.get(), address_uri);
425 gpr_free(address_uri);
427 subchannels_.emplace_back(this, addresses[i], std::move(subchannel));
431 template <typename SubchannelListType, typename SubchannelDataType>
432 SubchannelList<SubchannelListType, SubchannelDataType>::~SubchannelList() {
433 if (GRPC_TRACE_FLAG_ENABLED(*tracer_)) {
434 gpr_log(GPR_INFO, "[%s %p] Destroying subchannel_list %p", tracer_->name(),
439 template <typename SubchannelListType, typename SubchannelDataType>
440 void SubchannelList<SubchannelListType, SubchannelDataType>::ShutdownLocked() {
441 if (GRPC_TRACE_FLAG_ENABLED(*tracer_)) {
442 gpr_log(GPR_INFO, "[%s %p] Shutting down subchannel_list %p",
443 tracer_->name(), policy_, this);
445 GPR_ASSERT(!shutting_down_);
446 shutting_down_ = true;
447 for (size_t i = 0; i < subchannels_.size(); i++) {
448 SubchannelDataType* sd = &subchannels_[i];
449 sd->ShutdownLocked();
453 template <typename SubchannelListType, typename SubchannelDataType>
454 void SubchannelList<SubchannelListType,
455 SubchannelDataType>::ResetBackoffLocked() {
456 for (size_t i = 0; i < subchannels_.size(); i++) {
457 SubchannelDataType* sd = &subchannels_[i];
458 sd->ResetBackoffLocked();
462 } // namespace grpc_core
464 #endif /* GRPC_CORE_EXT_FILTERS_CLIENT_CHANNEL_LB_POLICY_SUBCHANNEL_LIST_H */