3 * Copyright 2015 gRPC authors.
5 * Licensed under the Apache License, Version 2.0 (the "License");
6 * you may not use this file except in compliance with the License.
7 * You may obtain a copy of the License at
9 * http://www.apache.org/licenses/LICENSE-2.0
11 * Unless required by applicable law or agreed to in writing, software
12 * distributed under the License is distributed on an "AS IS" BASIS,
13 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14 * See the License for the specific language governing permissions and
15 * limitations under the License.
19 #include <grpc/support/port_platform.h>
21 #include "src/core/ext/filters/client_channel/client_channel.h"
29 #include <grpc/support/alloc.h>
30 #include <grpc/support/log.h>
31 #include <grpc/support/string_util.h>
32 #include <grpc/support/sync.h>
34 #include "src/core/ext/filters/client_channel/backend_metric.h"
35 #include "src/core/ext/filters/client_channel/backup_poller.h"
36 #include "src/core/ext/filters/client_channel/global_subchannel_pool.h"
37 #include "src/core/ext/filters/client_channel/http_connect_handshaker.h"
38 #include "src/core/ext/filters/client_channel/lb_policy_registry.h"
39 #include "src/core/ext/filters/client_channel/local_subchannel_pool.h"
40 #include "src/core/ext/filters/client_channel/proxy_mapper_registry.h"
41 #include "src/core/ext/filters/client_channel/resolver_registry.h"
42 #include "src/core/ext/filters/client_channel/resolver_result_parsing.h"
43 #include "src/core/ext/filters/client_channel/resolving_lb_policy.h"
44 #include "src/core/ext/filters/client_channel/retry_throttle.h"
45 #include "src/core/ext/filters/client_channel/service_config.h"
46 #include "src/core/ext/filters/client_channel/subchannel.h"
47 #include "src/core/ext/filters/deadline/deadline_filter.h"
48 #include "src/core/lib/backoff/backoff.h"
49 #include "src/core/lib/channel/channel_args.h"
50 #include "src/core/lib/channel/connected_channel.h"
51 #include "src/core/lib/channel/status_util.h"
52 #include "src/core/lib/gpr/string.h"
53 #include "src/core/lib/gprpp/inlined_vector.h"
54 #include "src/core/lib/gprpp/manual_constructor.h"
55 #include "src/core/lib/gprpp/map.h"
56 #include "src/core/lib/gprpp/sync.h"
57 #include "src/core/lib/iomgr/combiner.h"
58 #include "src/core/lib/iomgr/iomgr.h"
59 #include "src/core/lib/iomgr/polling_entity.h"
60 #include "src/core/lib/profiling/timers.h"
61 #include "src/core/lib/slice/slice_internal.h"
62 #include "src/core/lib/slice/slice_string_helpers.h"
63 #include "src/core/lib/surface/channel.h"
64 #include "src/core/lib/transport/connectivity_state.h"
65 #include "src/core/lib/transport/error_utils.h"
66 #include "src/core/lib/transport/metadata.h"
67 #include "src/core/lib/transport/metadata_batch.h"
68 #include "src/core/lib/transport/static_metadata.h"
69 #include "src/core/lib/transport/status_metadata.h"
71 using grpc_core::internal::ClientChannelMethodParsedConfig;
72 using grpc_core::internal::ServerRetryThrottleData;
75 // Client channel filter
78 // By default, we buffer 256 KiB per RPC for retries.
79 // TODO(roth): Do we have any data to suggest a better value?
80 #define DEFAULT_PER_RPC_RETRY_BUFFER_SIZE (256 << 10)
82 // This value was picked arbitrarily. It can be changed if there is
83 // any even moderately compelling reason to do so.
84 #define RETRY_BACKOFF_JITTER 0.2
86 // Max number of batches that can be pending on a call at any given
87 // time. This includes one batch for each of the following ops:
88 // recv_initial_metadata
89 // send_initial_metadata
92 // recv_trailing_metadata
93 // send_trailing_metadata
94 #define MAX_PENDING_BATCHES 6
98 TraceFlag grpc_client_channel_call_trace(false, "client_channel_call");
99 TraceFlag grpc_client_channel_routing_trace(false, "client_channel_routing");
104 // ChannelData definition
110 grpc_call_element* elem;
111 QueuedPick* next = nullptr;
114 static grpc_error* Init(grpc_channel_element* elem,
115 grpc_channel_element_args* args);
116 static void Destroy(grpc_channel_element* elem);
117 static void StartTransportOp(grpc_channel_element* elem,
118 grpc_transport_op* op);
119 static void GetChannelInfo(grpc_channel_element* elem,
120 const grpc_channel_info* info);
122 bool deadline_checking_enabled() const { return deadline_checking_enabled_; }
123 bool enable_retries() const { return enable_retries_; }
124 size_t per_rpc_retry_buffer_size() const {
125 return per_rpc_retry_buffer_size_;
128 // Note: Does NOT return a new ref.
129 grpc_error* disconnect_error() const {
130 return disconnect_error_.Load(MemoryOrder::ACQUIRE);
133 Mutex* data_plane_mu() const { return &data_plane_mu_; }
135 LoadBalancingPolicy::SubchannelPicker* picker() const {
136 return picker_.get();
138 void AddQueuedPick(QueuedPick* pick, grpc_polling_entity* pollent);
139 void RemoveQueuedPick(QueuedPick* to_remove, grpc_polling_entity* pollent);
141 bool received_service_config_data() const {
142 return received_service_config_data_;
144 RefCountedPtr<ServerRetryThrottleData> retry_throttle_data() const {
145 return retry_throttle_data_;
147 RefCountedPtr<ServiceConfig> service_config() const {
148 return service_config_;
151 RefCountedPtr<ConnectedSubchannel> GetConnectedSubchannelInDataPlane(
152 SubchannelInterface* subchannel) const;
154 grpc_connectivity_state CheckConnectivityState(bool try_to_connect);
155 void AddExternalConnectivityWatcher(grpc_polling_entity pollent,
156 grpc_connectivity_state* state,
157 grpc_closure* on_complete,
158 grpc_closure* watcher_timer_init) {
159 // Will delete itself.
160 New<ExternalConnectivityWatcher>(this, pollent, state, on_complete,
163 int NumExternalConnectivityWatchers() const {
164 return external_connectivity_watcher_list_.size();
168 class SubchannelWrapper;
169 class ClientChannelControlHelper;
171 class ExternalConnectivityWatcher {
175 WatcherList() { gpr_mu_init(&mu_); }
176 ~WatcherList() { gpr_mu_destroy(&mu_); }
179 ExternalConnectivityWatcher* Lookup(grpc_closure* on_complete) const;
180 void Add(ExternalConnectivityWatcher* watcher);
181 void Remove(const ExternalConnectivityWatcher* watcher);
184 // head_ is guarded by a mutex, since the size() method needs to
185 // iterate over the list, and it's called from the C-core API
186 // function grpc_channel_num_external_connectivity_watchers(), which
187 // is synchronous and therefore cannot run in the combiner.
189 ExternalConnectivityWatcher* head_ = nullptr;
192 ExternalConnectivityWatcher(ChannelData* chand, grpc_polling_entity pollent,
193 grpc_connectivity_state* state,
194 grpc_closure* on_complete,
195 grpc_closure* watcher_timer_init);
197 ~ExternalConnectivityWatcher();
200 static void OnWatchCompleteLocked(void* arg, grpc_error* error);
201 static void WatchConnectivityStateLocked(void* arg, grpc_error* ignored);
204 grpc_polling_entity pollent_;
205 grpc_connectivity_state* state_;
206 grpc_closure* on_complete_;
207 grpc_closure* watcher_timer_init_;
208 grpc_closure my_closure_;
209 ExternalConnectivityWatcher* next_ = nullptr;
212 ChannelData(grpc_channel_element_args* args, grpc_error** error);
215 void UpdateStateAndPickerLocked(
216 grpc_connectivity_state state, const char* reason,
217 UniquePtr<LoadBalancingPolicy::SubchannelPicker> picker);
219 void UpdateServiceConfigLocked(
220 RefCountedPtr<ServerRetryThrottleData> retry_throttle_data,
221 RefCountedPtr<ServiceConfig> service_config);
223 void CreateResolvingLoadBalancingPolicyLocked();
225 void DestroyResolvingLoadBalancingPolicyLocked();
227 static bool ProcessResolverResultLocked(
228 void* arg, const Resolver::Result& result, const char** lb_policy_name,
229 RefCountedPtr<LoadBalancingPolicy::Config>* lb_policy_config,
230 grpc_error** service_config_error);
232 grpc_error* DoPingLocked(grpc_transport_op* op);
234 static void StartTransportOpLocked(void* arg, grpc_error* ignored);
236 static void TryToConnectLocked(void* arg, grpc_error* error_ignored);
238 void ProcessLbPolicy(
239 const Resolver::Result& resolver_result,
240 const internal::ClientChannelGlobalParsedConfig* parsed_service_config,
241 UniquePtr<char>* lb_policy_name,
242 RefCountedPtr<LoadBalancingPolicy::Config>* lb_policy_config);
245 // Fields set at construction and never modified.
247 const bool deadline_checking_enabled_;
248 const bool enable_retries_;
249 const size_t per_rpc_retry_buffer_size_;
250 grpc_channel_stack* owning_stack_;
251 ClientChannelFactory* client_channel_factory_;
252 const grpc_channel_args* channel_args_;
253 RefCountedPtr<ServiceConfig> default_service_config_;
254 UniquePtr<char> server_name_;
255 UniquePtr<char> target_uri_;
256 channelz::ChannelNode* channelz_node_;
259 // Fields used in the data plane. Guarded by data_plane_mu.
261 mutable Mutex data_plane_mu_;
262 UniquePtr<LoadBalancingPolicy::SubchannelPicker> picker_;
263 QueuedPick* queued_picks_ = nullptr; // Linked list of queued picks.
264 // Data from service config.
265 bool received_service_config_data_ = false;
266 RefCountedPtr<ServerRetryThrottleData> retry_throttle_data_;
267 RefCountedPtr<ServiceConfig> service_config_;
270 // Fields used in the control plane. Guarded by combiner.
272 grpc_combiner* combiner_;
273 grpc_pollset_set* interested_parties_;
274 RefCountedPtr<SubchannelPoolInterface> subchannel_pool_;
275 OrphanablePtr<ResolvingLoadBalancingPolicy> resolving_lb_policy_;
276 grpc_connectivity_state_tracker state_tracker_;
277 ExternalConnectivityWatcher::WatcherList external_connectivity_watcher_list_;
278 UniquePtr<char> health_check_service_name_;
279 RefCountedPtr<ServiceConfig> saved_service_config_;
280 bool received_first_resolver_result_ = false;
281 // The number of SubchannelWrapper instances referencing a given Subchannel.
282 Map<Subchannel*, int> subchannel_refcount_map_;
283 // The set of SubchannelWrappers that currently exist.
284 // No need to hold a ref, since the map is updated in the control-plane
285 // combiner when the SubchannelWrappers are created and destroyed.
286 // TODO(roth): We really want to use a set here, not a map. Since we don't
287 // currently have a set implementation, we use a map and ignore the value.
288 Map<SubchannelWrapper*, bool> subchannel_wrappers_;
289 // Pending ConnectedSubchannel updates for each SubchannelWrapper.
290 // Updates are queued here in the control plane combiner and then applied
291 // in the data plane mutex when the picker is updated.
292 Map<RefCountedPtr<SubchannelWrapper>, RefCountedPtr<ConnectedSubchannel>,
293 RefCountedPtrLess<SubchannelWrapper>>
294 pending_subchannel_updates_;
297 // Fields accessed from both data plane mutex and control plane combiner.
299 Atomic<grpc_error*> disconnect_error_;
302 // Fields guarded by a mutex, since they need to be accessed
303 // synchronously via get_channel_info().
306 UniquePtr<char> info_lb_policy_name_;
307 UniquePtr<char> info_service_config_json_;
311 // CallData definition
316 static grpc_error* Init(grpc_call_element* elem,
317 const grpc_call_element_args* args);
318 static void Destroy(grpc_call_element* elem,
319 const grpc_call_final_info* final_info,
320 grpc_closure* then_schedule_closure);
321 static void StartTransportStreamOpBatch(
322 grpc_call_element* elem, grpc_transport_stream_op_batch* batch);
323 static void SetPollent(grpc_call_element* elem, grpc_polling_entity* pollent);
325 RefCountedPtr<SubchannelCall> subchannel_call() { return subchannel_call_; }
327 // Invoked by channel for queued picks once resolver results are available.
328 void MaybeApplyServiceConfigToCallLocked(grpc_call_element* elem);
330 // Invoked by channel for queued picks when the picker is updated.
331 static void PickSubchannel(void* arg, grpc_error* error);
333 // Helper function for performing a pick while holding the data plane
334 // mutex. Returns true if the pick is complete, in which case the caller
335 // must invoke PickDone() or AsyncPickDone() with the returned error.
336 bool PickSubchannelLocked(grpc_call_element* elem, grpc_error** error);
338 // Schedules a callback to process the completed pick. The callback
339 // will not run until after this method returns.
340 void AsyncPickDone(grpc_call_element* elem, grpc_error* error);
343 class QueuedPickCanceller;
345 class Metadata : public LoadBalancingPolicy::MetadataInterface {
347 Metadata(CallData* calld, grpc_metadata_batch* batch)
348 : calld_(calld), batch_(batch) {}
350 void Add(StringView key, StringView value) override {
351 grpc_linked_mdelem* linked_mdelem = static_cast<grpc_linked_mdelem*>(
352 calld_->arena_->Alloc(sizeof(grpc_linked_mdelem)));
353 linked_mdelem->md = grpc_mdelem_from_slices(
354 grpc_core::ExternallyManagedSlice(key.data(), key.size()),
355 grpc_core::ExternallyManagedSlice(value.data(), value.size()));
356 GPR_ASSERT(grpc_metadata_batch_link_tail(batch_, linked_mdelem) ==
360 Iterator Begin() const override {
361 static_assert(sizeof(grpc_linked_mdelem*) <= sizeof(Iterator),
362 "iterator size too large");
363 return reinterpret_cast<Iterator>(batch_->list.head);
365 bool IsEnd(Iterator it) const override {
366 return reinterpret_cast<grpc_linked_mdelem*>(it) == nullptr;
368 void Next(Iterator* it) const override {
369 *it = reinterpret_cast<Iterator>(
370 reinterpret_cast<grpc_linked_mdelem*>(*it)->next);
372 StringView Key(Iterator it) const override {
374 GRPC_MDKEY(reinterpret_cast<grpc_linked_mdelem*>(it)->md));
376 StringView Value(Iterator it) const override {
378 GRPC_MDVALUE(reinterpret_cast<grpc_linked_mdelem*>(it)->md));
381 void Erase(Iterator* it) override {
382 grpc_linked_mdelem* linked_mdelem =
383 reinterpret_cast<grpc_linked_mdelem*>(*it);
384 *it = reinterpret_cast<Iterator>(linked_mdelem->next);
385 grpc_metadata_batch_remove(batch_, linked_mdelem);
390 grpc_metadata_batch* batch_;
393 class LbCallState : public LoadBalancingPolicy::CallState {
395 explicit LbCallState(CallData* calld) : calld_(calld) {}
397 void* Alloc(size_t size) override { return calld_->arena_->Alloc(size); }
399 const LoadBalancingPolicy::BackendMetricData* GetBackendMetricData()
401 if (calld_->backend_metric_data_ == nullptr) {
402 grpc_linked_mdelem* md = calld_->recv_trailing_metadata_->idx.named
403 .x_endpoint_load_metrics_bin;
405 calld_->backend_metric_data_ =
406 ParseBackendMetricData(GRPC_MDVALUE(md->md), calld_->arena_);
409 return calld_->backend_metric_data_;
416 // State used for starting a retryable batch on a subchannel call.
417 // This provides its own grpc_transport_stream_op_batch and other data
418 // structures needed to populate the ops in the batch.
419 // We allocate one struct on the arena for each attempt at starting a
420 // batch on a given subchannel call.
421 struct SubchannelCallBatchData {
422 // Creates a SubchannelCallBatchData object on the call's arena with the
423 // specified refcount. If set_on_complete is true, the batch's
424 // on_complete callback will be set to point to on_complete();
425 // otherwise, the batch's on_complete callback will be null.
426 static SubchannelCallBatchData* Create(grpc_call_element* elem,
427 int refcount, bool set_on_complete);
430 if (gpr_unref(&refs)) Destroy();
433 SubchannelCallBatchData(grpc_call_element* elem, CallData* calld,
434 int refcount, bool set_on_complete);
435 // All dtor code must be added in `Destroy()`. This is because we may
436 // call closures in `SubchannelCallBatchData` after they are unrefed by
437 // `Unref()`, and msan would complain about accessing this class
438 // after calling dtor. As a result we cannot call the `dtor` in `Unref()`.
439 // TODO(soheil): We should try to call the dtor in `Unref()`.
440 ~SubchannelCallBatchData() { Destroy(); }
444 grpc_call_element* elem;
445 RefCountedPtr<SubchannelCall> subchannel_call;
446 // The batch to use in the subchannel call.
447 // Its payload field points to SubchannelCallRetryState::batch_payload.
448 grpc_transport_stream_op_batch batch;
449 // For intercepting on_complete.
450 grpc_closure on_complete;
453 // Retry state associated with a subchannel call.
454 // Stored in the parent_data of the subchannel call object.
455 struct SubchannelCallRetryState {
456 explicit SubchannelCallRetryState(grpc_call_context_element* context)
457 : batch_payload(context),
458 started_send_initial_metadata(false),
459 completed_send_initial_metadata(false),
460 started_send_trailing_metadata(false),
461 completed_send_trailing_metadata(false),
462 started_recv_initial_metadata(false),
463 completed_recv_initial_metadata(false),
464 started_recv_trailing_metadata(false),
465 completed_recv_trailing_metadata(false),
466 retry_dispatched(false) {}
468 // SubchannelCallBatchData.batch.payload points to this.
469 grpc_transport_stream_op_batch_payload batch_payload;
470 // For send_initial_metadata.
471 // Note that we need to make a copy of the initial metadata for each
472 // subchannel call instead of just referring to the copy in call_data,
473 // because filters in the subchannel stack will probably add entries,
474 // so we need to start in a pristine state for each attempt of the call.
475 grpc_linked_mdelem* send_initial_metadata_storage;
476 grpc_metadata_batch send_initial_metadata;
478 // TODO(roth): Restructure this to eliminate use of ManualConstructor.
479 ManualConstructor<ByteStreamCache::CachingByteStream> send_message;
480 // For send_trailing_metadata.
481 grpc_linked_mdelem* send_trailing_metadata_storage;
482 grpc_metadata_batch send_trailing_metadata;
483 // For intercepting recv_initial_metadata.
484 grpc_metadata_batch recv_initial_metadata;
485 grpc_closure recv_initial_metadata_ready;
486 bool trailing_metadata_available = false;
487 // For intercepting recv_message.
488 grpc_closure recv_message_ready;
489 OrphanablePtr<ByteStream> recv_message;
490 // For intercepting recv_trailing_metadata.
491 grpc_metadata_batch recv_trailing_metadata;
492 grpc_transport_stream_stats collect_stats;
493 grpc_closure recv_trailing_metadata_ready;
494 // These fields indicate which ops have been started and completed on
495 // this subchannel call.
496 size_t started_send_message_count = 0;
497 size_t completed_send_message_count = 0;
498 size_t started_recv_message_count = 0;
499 size_t completed_recv_message_count = 0;
500 bool started_send_initial_metadata : 1;
501 bool completed_send_initial_metadata : 1;
502 bool started_send_trailing_metadata : 1;
503 bool completed_send_trailing_metadata : 1;
504 bool started_recv_initial_metadata : 1;
505 bool completed_recv_initial_metadata : 1;
506 bool started_recv_trailing_metadata : 1;
507 bool completed_recv_trailing_metadata : 1;
508 // State for callback processing.
509 SubchannelCallBatchData* recv_initial_metadata_ready_deferred_batch =
511 grpc_error* recv_initial_metadata_error = GRPC_ERROR_NONE;
512 SubchannelCallBatchData* recv_message_ready_deferred_batch = nullptr;
513 grpc_error* recv_message_error = GRPC_ERROR_NONE;
514 SubchannelCallBatchData* recv_trailing_metadata_internal_batch = nullptr;
515 // NOTE: Do not move this next to the metadata bitfields above. That would
516 // save space but will also result in a data race because compiler
517 // will generate a 2 byte store which overwrites the meta-data
518 // fields upon setting this field.
519 bool retry_dispatched : 1;
522 // Pending batches stored in call data.
523 struct PendingBatch {
524 // The pending batch. If nullptr, this slot is empty.
525 grpc_transport_stream_op_batch* batch;
526 // Indicates whether payload for send ops has been cached in CallData.
527 bool send_ops_cached;
530 CallData(grpc_call_element* elem, const ChannelData& chand,
531 const grpc_call_element_args& args);
534 // Caches data for send ops so that it can be retried later, if not
536 void MaybeCacheSendOpsForBatch(PendingBatch* pending);
537 void FreeCachedSendInitialMetadata(ChannelData* chand);
538 // Frees cached send_message at index idx.
539 void FreeCachedSendMessage(ChannelData* chand, size_t idx);
540 void FreeCachedSendTrailingMetadata(ChannelData* chand);
541 // Frees cached send ops that have already been completed after
542 // committing the call.
543 void FreeCachedSendOpDataAfterCommit(grpc_call_element* elem,
544 SubchannelCallRetryState* retry_state);
545 // Frees cached send ops that were completed by the completed batch in
546 // batch_data. Used when batches are completed after the call is committed.
547 void FreeCachedSendOpDataForCompletedBatch(
548 grpc_call_element* elem, SubchannelCallBatchData* batch_data,
549 SubchannelCallRetryState* retry_state);
551 static void RecvTrailingMetadataReadyForLoadBalancingPolicy(
552 void* arg, grpc_error* error);
553 void MaybeInjectRecvTrailingMetadataReadyForLoadBalancingPolicy(
554 grpc_transport_stream_op_batch* batch);
556 // Returns the index into pending_batches_ to be used for batch.
557 static size_t GetBatchIndex(grpc_transport_stream_op_batch* batch);
558 void PendingBatchesAdd(grpc_call_element* elem,
559 grpc_transport_stream_op_batch* batch);
560 void PendingBatchClear(PendingBatch* pending);
561 void MaybeClearPendingBatch(grpc_call_element* elem, PendingBatch* pending);
562 static void FailPendingBatchInCallCombiner(void* arg, grpc_error* error);
563 // A predicate type and some useful implementations for PendingBatchesFail().
564 typedef bool (*YieldCallCombinerPredicate)(
565 const CallCombinerClosureList& closures);
566 static bool YieldCallCombiner(const CallCombinerClosureList& closures) {
569 static bool NoYieldCallCombiner(const CallCombinerClosureList& closures) {
572 static bool YieldCallCombinerIfPendingBatchesFound(
573 const CallCombinerClosureList& closures) {
574 return closures.size() > 0;
576 // Fails all pending batches.
577 // If yield_call_combiner_predicate returns true, assumes responsibility for
578 // yielding the call combiner.
579 void PendingBatchesFail(
580 grpc_call_element* elem, grpc_error* error,
581 YieldCallCombinerPredicate yield_call_combiner_predicate);
582 static void ResumePendingBatchInCallCombiner(void* arg, grpc_error* ignored);
583 // Resumes all pending batches on subchannel_call_.
584 void PendingBatchesResume(grpc_call_element* elem);
585 // Returns a pointer to the first pending batch for which predicate(batch)
586 // returns true, or null if not found.
587 template <typename Predicate>
588 PendingBatch* PendingBatchFind(grpc_call_element* elem,
589 const char* log_message, Predicate predicate);
591 // Commits the call so that no further retry attempts will be performed.
592 void RetryCommit(grpc_call_element* elem,
593 SubchannelCallRetryState* retry_state);
594 // Starts a retry after appropriate back-off.
595 void DoRetry(grpc_call_element* elem, SubchannelCallRetryState* retry_state,
596 grpc_millis server_pushback_ms);
597 // Returns true if the call is being retried.
598 bool MaybeRetry(grpc_call_element* elem, SubchannelCallBatchData* batch_data,
599 grpc_status_code status, grpc_mdelem* server_pushback_md);
601 // Invokes recv_initial_metadata_ready for a subchannel batch.
602 static void InvokeRecvInitialMetadataCallback(void* arg, grpc_error* error);
603 // Intercepts recv_initial_metadata_ready callback for retries.
604 // Commits the call and returns the initial metadata up the stack.
605 static void RecvInitialMetadataReady(void* arg, grpc_error* error);
607 // Invokes recv_message_ready for a subchannel batch.
608 static void InvokeRecvMessageCallback(void* arg, grpc_error* error);
609 // Intercepts recv_message_ready callback for retries.
610 // Commits the call and returns the message up the stack.
611 static void RecvMessageReady(void* arg, grpc_error* error);
613 // Sets *status and *server_pushback_md based on md_batch and error.
614 // Only sets *server_pushback_md if server_pushback_md != nullptr.
615 void GetCallStatus(grpc_call_element* elem, grpc_metadata_batch* md_batch,
616 grpc_error* error, grpc_status_code* status,
617 grpc_mdelem** server_pushback_md);
618 // Adds recv_trailing_metadata_ready closure to closures.
619 void AddClosureForRecvTrailingMetadataReady(
620 grpc_call_element* elem, SubchannelCallBatchData* batch_data,
621 grpc_error* error, CallCombinerClosureList* closures);
622 // Adds any necessary closures for deferred recv_initial_metadata and
623 // recv_message callbacks to closures.
624 static void AddClosuresForDeferredRecvCallbacks(
625 SubchannelCallBatchData* batch_data,
626 SubchannelCallRetryState* retry_state, CallCombinerClosureList* closures);
627 // Returns true if any op in the batch was not yet started.
628 // Only looks at send ops, since recv ops are always started immediately.
629 bool PendingBatchIsUnstarted(PendingBatch* pending,
630 SubchannelCallRetryState* retry_state);
631 // For any pending batch containing an op that has not yet been started,
632 // adds the pending batch's completion closures to closures.
633 void AddClosuresToFailUnstartedPendingBatches(
634 grpc_call_element* elem, SubchannelCallRetryState* retry_state,
635 grpc_error* error, CallCombinerClosureList* closures);
636 // Runs necessary closures upon completion of a call attempt.
637 void RunClosuresForCompletedCall(SubchannelCallBatchData* batch_data,
639 // Intercepts recv_trailing_metadata_ready callback for retries.
640 // Commits the call and returns the trailing metadata up the stack.
641 static void RecvTrailingMetadataReady(void* arg, grpc_error* error);
643 // Adds the on_complete closure for the pending batch completed in
644 // batch_data to closures.
645 void AddClosuresForCompletedPendingBatch(
646 grpc_call_element* elem, SubchannelCallBatchData* batch_data,
647 SubchannelCallRetryState* retry_state, grpc_error* error,
648 CallCombinerClosureList* closures);
650 // If there are any cached ops to replay or pending ops to start on the
651 // subchannel call, adds a closure to closures to invoke
652 // StartRetriableSubchannelBatches().
653 void AddClosuresForReplayOrPendingSendOps(
654 grpc_call_element* elem, SubchannelCallBatchData* batch_data,
655 SubchannelCallRetryState* retry_state, CallCombinerClosureList* closures);
657 // Callback used to intercept on_complete from subchannel calls.
658 // Called only when retries are enabled.
659 static void OnComplete(void* arg, grpc_error* error);
661 static void StartBatchInCallCombiner(void* arg, grpc_error* ignored);
662 // Adds a closure to closures that will execute batch in the call combiner.
663 void AddClosureForSubchannelBatch(grpc_call_element* elem,
664 grpc_transport_stream_op_batch* batch,
665 CallCombinerClosureList* closures);
666 // Adds retriable send_initial_metadata op to batch_data.
667 void AddRetriableSendInitialMetadataOp(SubchannelCallRetryState* retry_state,
668 SubchannelCallBatchData* batch_data);
669 // Adds retriable send_message op to batch_data.
670 void AddRetriableSendMessageOp(grpc_call_element* elem,
671 SubchannelCallRetryState* retry_state,
672 SubchannelCallBatchData* batch_data);
673 // Adds retriable send_trailing_metadata op to batch_data.
674 void AddRetriableSendTrailingMetadataOp(SubchannelCallRetryState* retry_state,
675 SubchannelCallBatchData* batch_data);
676 // Adds retriable recv_initial_metadata op to batch_data.
677 void AddRetriableRecvInitialMetadataOp(SubchannelCallRetryState* retry_state,
678 SubchannelCallBatchData* batch_data);
679 // Adds retriable recv_message op to batch_data.
680 void AddRetriableRecvMessageOp(SubchannelCallRetryState* retry_state,
681 SubchannelCallBatchData* batch_data);
682 // Adds retriable recv_trailing_metadata op to batch_data.
683 void AddRetriableRecvTrailingMetadataOp(SubchannelCallRetryState* retry_state,
684 SubchannelCallBatchData* batch_data);
685 // Helper function used to start a recv_trailing_metadata batch. This
686 // is used in the case where a recv_initial_metadata or recv_message
687 // op fails in a way that we know the call is over but when the application
688 // has not yet started its own recv_trailing_metadata op.
689 void StartInternalRecvTrailingMetadata(grpc_call_element* elem);
690 // If there are any cached send ops that need to be replayed on the
691 // current subchannel call, creates and returns a new subchannel batch
692 // to replay those ops. Otherwise, returns nullptr.
693 SubchannelCallBatchData* MaybeCreateSubchannelBatchForReplay(
694 grpc_call_element* elem, SubchannelCallRetryState* retry_state);
695 // Adds subchannel batches for pending batches to closures.
696 void AddSubchannelBatchesForPendingBatches(
697 grpc_call_element* elem, SubchannelCallRetryState* retry_state,
698 CallCombinerClosureList* closures);
699 // Constructs and starts whatever subchannel batches are needed on the
701 static void StartRetriableSubchannelBatches(void* arg, grpc_error* ignored);
703 void CreateSubchannelCall(grpc_call_element* elem);
704 // Invoked when a pick is completed, on both success or failure.
705 static void PickDone(void* arg, grpc_error* error);
706 // Removes the call from the channel's list of queued picks.
707 void RemoveCallFromQueuedPicksLocked(grpc_call_element* elem);
708 // Adds the call to the channel's list of queued picks.
709 void AddCallToQueuedPicksLocked(grpc_call_element* elem);
710 // Applies service config to the call. Must be invoked once we know
711 // that the resolver has returned results to the channel.
712 void ApplyServiceConfigToCallLocked(grpc_call_element* elem);
714 // State for handling deadlines.
715 // The code in deadline_filter.c requires this to be the first field.
716 // TODO(roth): This is slightly sub-optimal in that grpc_deadline_state
717 // and this struct both independently store pointers to the call stack
718 // and call combiner. If/when we have time, find a way to avoid this
719 // without breaking the grpc_deadline_state abstraction.
720 grpc_deadline_state deadline_state_;
722 grpc_slice path_; // Request path.
723 gpr_cycle_counter call_start_time_;
724 grpc_millis deadline_;
726 grpc_call_stack* owning_call_;
727 CallCombiner* call_combiner_;
728 grpc_call_context_element* call_context_;
730 RefCountedPtr<ServerRetryThrottleData> retry_throttle_data_;
731 ServiceConfig::CallData service_config_call_data_;
732 const ClientChannelMethodParsedConfig* method_params_ = nullptr;
734 RefCountedPtr<SubchannelCall> subchannel_call_;
736 // Set when we get a cancel_stream op.
737 grpc_error* cancel_error_ = GRPC_ERROR_NONE;
739 ChannelData::QueuedPick pick_;
740 bool pick_queued_ = false;
741 bool service_config_applied_ = false;
742 QueuedPickCanceller* pick_canceller_ = nullptr;
743 LbCallState lb_call_state_;
744 const LoadBalancingPolicy::BackendMetricData* backend_metric_data_ = nullptr;
745 RefCountedPtr<ConnectedSubchannel> connected_subchannel_;
746 void (*lb_recv_trailing_metadata_ready_)(
747 void* user_data, grpc_error* error,
748 LoadBalancingPolicy::MetadataInterface* recv_trailing_metadata,
749 LoadBalancingPolicy::CallState* call_state) = nullptr;
750 void* lb_recv_trailing_metadata_ready_user_data_ = nullptr;
751 grpc_closure pick_closure_;
753 // For intercepting recv_trailing_metadata_ready for the LB policy.
754 grpc_metadata_batch* recv_trailing_metadata_ = nullptr;
755 grpc_closure recv_trailing_metadata_ready_;
756 grpc_closure* original_recv_trailing_metadata_ready_ = nullptr;
758 grpc_polling_entity* pollent_ = nullptr;
760 // Batches are added to this list when received from above.
761 // They are removed when we are done handling the batch (i.e., when
762 // either we have invoked all of the batch's callbacks or we have
763 // passed the batch down to the subchannel call and are not
764 // intercepting any of its callbacks).
765 PendingBatch pending_batches_[MAX_PENDING_BATCHES] = {};
766 bool pending_send_initial_metadata_ : 1;
767 bool pending_send_message_ : 1;
768 bool pending_send_trailing_metadata_ : 1;
771 bool enable_retries_ : 1;
772 bool retry_committed_ : 1;
773 bool last_attempt_got_server_pushback_ : 1;
774 int num_attempts_completed_ = 0;
775 size_t bytes_buffered_for_retry_ = 0;
776 // TODO(roth): Restructure this to eliminate use of ManualConstructor.
777 ManualConstructor<BackOff> retry_backoff_;
778 grpc_timer retry_timer_;
780 // The number of pending retriable subchannel batches containing send ops.
781 // We hold a ref to the call stack while this is non-zero, since replay
782 // batches may not complete until after all callbacks have been returned
783 // to the surface, and we need to make sure that the call is not destroyed
784 // until all of these batches have completed.
785 // Note that we actually only need to track replay batches, but it's
786 // easier to track all batches with send ops.
787 int num_pending_retriable_subchannel_send_batches_ = 0;
789 // Cached data for retrying send ops.
790 // send_initial_metadata
791 bool seen_send_initial_metadata_ = false;
792 grpc_linked_mdelem* send_initial_metadata_storage_ = nullptr;
793 grpc_metadata_batch send_initial_metadata_;
794 uint32_t send_initial_metadata_flags_;
795 gpr_atm* peer_string_;
797 // When we get a send_message op, we replace the original byte stream
798 // with a CachingByteStream that caches the slices to a local buffer for
800 // Note: We inline the cache for the first 3 send_message ops and use
801 // dynamic allocation after that. This number was essentially picked
802 // at random; it could be changed in the future to tune performance.
803 InlinedVector<ByteStreamCache*, 3> send_messages_;
804 // send_trailing_metadata
805 bool seen_send_trailing_metadata_ = false;
806 grpc_linked_mdelem* send_trailing_metadata_storage_ = nullptr;
807 grpc_metadata_batch send_trailing_metadata_;
811 // ChannelData::SubchannelWrapper
814 // This class is a wrapper for Subchannel that hides details of the
815 // channel's implementation (such as the health check service name and
816 // connected subchannel) from the LB policy API.
818 // Note that no synchronization is needed here, because even if the
819 // underlying subchannel is shared between channels, this wrapper will only
820 // be used within one channel, so it will always be synchronized by the
821 // control plane combiner.
822 class ChannelData::SubchannelWrapper : public SubchannelInterface {
824 SubchannelWrapper(ChannelData* chand, Subchannel* subchannel,
825 UniquePtr<char> health_check_service_name)
826 : SubchannelInterface(&grpc_client_channel_routing_trace),
828 subchannel_(subchannel),
829 health_check_service_name_(std::move(health_check_service_name)) {
830 if (GRPC_TRACE_FLAG_ENABLED(grpc_client_channel_routing_trace)) {
832 "chand=%p: creating subchannel wrapper %p for subchannel %p",
833 chand, this, subchannel_);
835 GRPC_CHANNEL_STACK_REF(chand_->owning_stack_, "SubchannelWrapper");
836 auto* subchannel_node = subchannel_->channelz_node();
837 if (subchannel_node != nullptr) {
838 auto it = chand_->subchannel_refcount_map_.find(subchannel_);
839 if (it == chand_->subchannel_refcount_map_.end()) {
840 chand_->channelz_node_->AddChildSubchannel(subchannel_node->uuid());
841 it = chand_->subchannel_refcount_map_.emplace(subchannel_, 0).first;
845 chand_->subchannel_wrappers_[this] = true;
848 ~SubchannelWrapper() {
849 if (GRPC_TRACE_FLAG_ENABLED(grpc_client_channel_routing_trace)) {
851 "chand=%p: destroying subchannel wrapper %p for subchannel %p",
852 chand_, this, subchannel_);
854 chand_->subchannel_wrappers_.erase(this);
855 auto* subchannel_node = subchannel_->channelz_node();
856 if (subchannel_node != nullptr) {
857 auto it = chand_->subchannel_refcount_map_.find(subchannel_);
858 GPR_ASSERT(it != chand_->subchannel_refcount_map_.end());
860 if (it->second == 0) {
861 chand_->channelz_node_->RemoveChildSubchannel(subchannel_node->uuid());
862 chand_->subchannel_refcount_map_.erase(it);
865 GRPC_SUBCHANNEL_UNREF(subchannel_, "unref from LB");
866 GRPC_CHANNEL_STACK_UNREF(chand_->owning_stack_, "SubchannelWrapper");
869 grpc_connectivity_state CheckConnectivityState() override {
870 RefCountedPtr<ConnectedSubchannel> connected_subchannel;
871 grpc_connectivity_state connectivity_state =
872 subchannel_->CheckConnectivityState(health_check_service_name_.get(),
873 &connected_subchannel);
874 MaybeUpdateConnectedSubchannel(std::move(connected_subchannel));
875 return connectivity_state;
878 void WatchConnectivityState(
879 grpc_connectivity_state initial_state,
880 UniquePtr<ConnectivityStateWatcherInterface> watcher) override {
881 auto& watcher_wrapper = watcher_map_[watcher.get()];
882 GPR_ASSERT(watcher_wrapper == nullptr);
883 watcher_wrapper = New<WatcherWrapper>(std::move(watcher),
884 Ref(DEBUG_LOCATION, "WatcherWrapper"),
886 subchannel_->WatchConnectivityState(
888 UniquePtr<char>(gpr_strdup(health_check_service_name_.get())),
889 OrphanablePtr<Subchannel::ConnectivityStateWatcherInterface>(
893 void CancelConnectivityStateWatch(
894 ConnectivityStateWatcherInterface* watcher) override {
895 auto it = watcher_map_.find(watcher);
896 GPR_ASSERT(it != watcher_map_.end());
897 subchannel_->CancelConnectivityStateWatch(health_check_service_name_.get(),
899 watcher_map_.erase(it);
902 void AttemptToConnect() override { subchannel_->AttemptToConnect(); }
904 void ResetBackoff() override { subchannel_->ResetBackoff(); }
906 const grpc_channel_args* channel_args() override {
907 return subchannel_->channel_args();
910 void UpdateHealthCheckServiceName(UniquePtr<char> health_check_service_name) {
911 if (GRPC_TRACE_FLAG_ENABLED(grpc_client_channel_routing_trace)) {
913 "chand=%p: subchannel wrapper %p: updating health check service "
914 "name from \"%s\" to \"%s\"",
915 chand_, this, health_check_service_name_.get(),
916 health_check_service_name.get());
918 for (auto& p : watcher_map_) {
919 WatcherWrapper*& watcher_wrapper = p.second;
920 // Cancel the current watcher and create a new one using the new
921 // health check service name.
922 // TODO(roth): If there is not already an existing health watch
923 // call for the new name, then the watcher will initially report
924 // state CONNECTING. If the LB policy is currently reporting
925 // state READY, this may cause it to switch to CONNECTING before
926 // switching back to READY. This could cause a small delay for
927 // RPCs being started on the channel. If/when this becomes a
928 // problem, we may be able to handle it by waiting for the new
929 // watcher to report READY before we use it to replace the old one.
930 WatcherWrapper* replacement = watcher_wrapper->MakeReplacement();
931 subchannel_->CancelConnectivityStateWatch(
932 health_check_service_name_.get(), watcher_wrapper);
933 watcher_wrapper = replacement;
934 subchannel_->WatchConnectivityState(
935 replacement->last_seen_state(),
936 UniquePtr<char>(gpr_strdup(health_check_service_name.get())),
937 OrphanablePtr<Subchannel::ConnectivityStateWatcherInterface>(
940 // Save the new health check service name.
941 health_check_service_name_ = std::move(health_check_service_name);
944 // Caller must be holding the control-plane combiner.
945 ConnectedSubchannel* connected_subchannel() const {
946 return connected_subchannel_.get();
949 // Caller must be holding the data-plane mutex.
950 ConnectedSubchannel* connected_subchannel_in_data_plane() const {
951 return connected_subchannel_in_data_plane_.get();
953 void set_connected_subchannel_in_data_plane(
954 RefCountedPtr<ConnectedSubchannel> connected_subchannel) {
955 connected_subchannel_in_data_plane_ = std::move(connected_subchannel);
959 // Subchannel and SubchannelInterface have different interfaces for
960 // their respective ConnectivityStateWatcherInterface classes.
961 // The one in Subchannel updates the ConnectedSubchannel along with
962 // the state, whereas the one in SubchannelInterface does not expose
963 // the ConnectedSubchannel.
965 // This wrapper provides a bridge between the two. It implements
966 // Subchannel::ConnectivityStateWatcherInterface and wraps
967 // the instance of SubchannelInterface::ConnectivityStateWatcherInterface
968 // that was passed in by the LB policy. We pass an instance of this
969 // class to the underlying Subchannel, and when we get updates from
970 // the subchannel, we pass those on to the wrapped watcher to return
971 // the update to the LB policy. This allows us to set the connected
972 // subchannel before passing the result back to the LB policy.
973 class WatcherWrapper : public Subchannel::ConnectivityStateWatcherInterface {
976 UniquePtr<SubchannelInterface::ConnectivityStateWatcherInterface>
978 RefCountedPtr<SubchannelWrapper> parent,
979 grpc_connectivity_state initial_state)
980 : watcher_(std::move(watcher)),
981 parent_(std::move(parent)),
982 last_seen_state_(initial_state) {}
984 ~WatcherWrapper() { parent_.reset(DEBUG_LOCATION, "WatcherWrapper"); }
986 void Orphan() override { Unref(); }
988 void OnConnectivityStateChange(
989 grpc_connectivity_state new_state,
990 RefCountedPtr<ConnectedSubchannel> connected_subchannel) override {
991 if (GRPC_TRACE_FLAG_ENABLED(grpc_client_channel_routing_trace)) {
993 "chand=%p: connectivity change for subchannel wrapper %p "
994 "subchannel %p (connected_subchannel=%p state=%s); "
995 "hopping into combiner",
996 parent_->chand_, parent_.get(), parent_->subchannel_,
997 connected_subchannel.get(),
998 grpc_connectivity_state_name(new_state));
1000 // Will delete itself.
1001 New<Updater>(Ref(), new_state, std::move(connected_subchannel));
1004 grpc_pollset_set* interested_parties() override {
1005 SubchannelInterface::ConnectivityStateWatcherInterface* watcher =
1007 if (watcher_ == nullptr) watcher = replacement_->watcher_.get();
1008 return watcher->interested_parties();
1011 WatcherWrapper* MakeReplacement() {
1013 New<WatcherWrapper>(std::move(watcher_), parent_, last_seen_state_);
1014 replacement_ = replacement;
1018 grpc_connectivity_state last_seen_state() const { return last_seen_state_; }
1023 Updater(RefCountedPtr<WatcherWrapper> parent,
1024 grpc_connectivity_state new_state,
1025 RefCountedPtr<ConnectedSubchannel> connected_subchannel)
1026 : parent_(std::move(parent)),
1028 connected_subchannel_(std::move(connected_subchannel)) {
1030 &closure_, ApplyUpdateInControlPlaneCombiner, this,
1031 grpc_combiner_scheduler(parent_->parent_->chand_->combiner_));
1032 GRPC_CLOSURE_SCHED(&closure_, GRPC_ERROR_NONE);
1036 static void ApplyUpdateInControlPlaneCombiner(void* arg,
1037 grpc_error* error) {
1038 Updater* self = static_cast<Updater*>(arg);
1039 if (GRPC_TRACE_FLAG_ENABLED(grpc_client_channel_routing_trace)) {
1041 "chand=%p: processing connectivity change in combiner "
1042 "for subchannel wrapper %p subchannel %p "
1043 "(connected_subchannel=%p state=%s): watcher=%p",
1044 self->parent_->parent_->chand_, self->parent_->parent_.get(),
1045 self->parent_->parent_->subchannel_,
1046 self->connected_subchannel_.get(),
1047 grpc_connectivity_state_name(self->state_),
1048 self->parent_->watcher_.get());
1050 // Ignore update if the parent WatcherWrapper has been replaced
1051 // since this callback was scheduled.
1052 if (self->parent_->watcher_ == nullptr) return;
1053 self->parent_->last_seen_state_ = self->state_;
1054 self->parent_->parent_->MaybeUpdateConnectedSubchannel(
1055 std::move(self->connected_subchannel_));
1056 self->parent_->watcher_->OnConnectivityStateChange(self->state_);
1060 RefCountedPtr<WatcherWrapper> parent_;
1061 grpc_connectivity_state state_;
1062 RefCountedPtr<ConnectedSubchannel> connected_subchannel_;
1063 grpc_closure closure_;
1066 UniquePtr<SubchannelInterface::ConnectivityStateWatcherInterface> watcher_;
1067 RefCountedPtr<SubchannelWrapper> parent_;
1068 grpc_connectivity_state last_seen_state_;
1069 WatcherWrapper* replacement_ = nullptr;
1072 void MaybeUpdateConnectedSubchannel(
1073 RefCountedPtr<ConnectedSubchannel> connected_subchannel) {
1074 // Update the connected subchannel only if the channel is not shutting
1075 // down. This is because once the channel is shutting down, we
1076 // ignore picker updates from the LB policy, which means that
1077 // UpdateStateAndPickerLocked() will never process the entries
1078 // in chand_->pending_subchannel_updates_. So we don't want to add
1079 // entries there that will never be processed, since that would
1080 // leave dangling refs to the channel and prevent its destruction.
1081 grpc_error* disconnect_error = chand_->disconnect_error();
1082 if (disconnect_error != GRPC_ERROR_NONE) return;
1083 // Not shutting down, so do the update.
1084 if (connected_subchannel_ != connected_subchannel) {
1085 connected_subchannel_ = std::move(connected_subchannel);
1086 // Record the new connected subchannel so that it can be updated
1087 // in the data plane mutex the next time the picker is updated.
1088 chand_->pending_subchannel_updates_[Ref(
1089 DEBUG_LOCATION, "ConnectedSubchannelUpdate")] = connected_subchannel_;
1093 ChannelData* chand_;
1094 Subchannel* subchannel_;
1095 UniquePtr<char> health_check_service_name_;
1096 // Maps from the address of the watcher passed to us by the LB policy
1097 // to the address of the WrapperWatcher that we passed to the underlying
1098 // subchannel. This is needed so that when the LB policy calls
1099 // CancelConnectivityStateWatch() with its watcher, we know the
1100 // corresponding WrapperWatcher to cancel on the underlying subchannel.
1101 Map<ConnectivityStateWatcherInterface*, WatcherWrapper*> watcher_map_;
1102 // To be accessed only in the control plane combiner.
1103 RefCountedPtr<ConnectedSubchannel> connected_subchannel_;
1104 // To be accessed only in the data plane mutex.
1105 RefCountedPtr<ConnectedSubchannel> connected_subchannel_in_data_plane_;
1109 // ChannelData::ExternalConnectivityWatcher::WatcherList
1112 int ChannelData::ExternalConnectivityWatcher::WatcherList::size() const {
1113 MutexLock lock(&mu_);
1115 for (ExternalConnectivityWatcher* w = head_; w != nullptr; w = w->next_) {
1121 ChannelData::ExternalConnectivityWatcher*
1122 ChannelData::ExternalConnectivityWatcher::WatcherList::Lookup(
1123 grpc_closure* on_complete) const {
1124 MutexLock lock(&mu_);
1125 ExternalConnectivityWatcher* w = head_;
1126 while (w != nullptr && w->on_complete_ != on_complete) {
1132 void ChannelData::ExternalConnectivityWatcher::WatcherList::Add(
1133 ExternalConnectivityWatcher* watcher) {
1134 GPR_ASSERT(Lookup(watcher->on_complete_) == nullptr);
1135 MutexLock lock(&mu_);
1136 GPR_ASSERT(watcher->next_ == nullptr);
1137 watcher->next_ = head_;
1141 void ChannelData::ExternalConnectivityWatcher::WatcherList::Remove(
1142 const ExternalConnectivityWatcher* watcher) {
1143 MutexLock lock(&mu_);
1144 if (watcher == head_) {
1145 head_ = watcher->next_;
1148 for (ExternalConnectivityWatcher* w = head_; w != nullptr; w = w->next_) {
1149 if (w->next_ == watcher) {
1150 w->next_ = w->next_->next_;
1154 GPR_UNREACHABLE_CODE(return );
1158 // ChannelData::ExternalConnectivityWatcher
1161 ChannelData::ExternalConnectivityWatcher::ExternalConnectivityWatcher(
1162 ChannelData* chand, grpc_polling_entity pollent,
1163 grpc_connectivity_state* state, grpc_closure* on_complete,
1164 grpc_closure* watcher_timer_init)
1168 on_complete_(on_complete),
1169 watcher_timer_init_(watcher_timer_init) {
1170 grpc_polling_entity_add_to_pollset_set(&pollent_,
1171 chand_->interested_parties_);
1172 GRPC_CHANNEL_STACK_REF(chand_->owning_stack_, "ExternalConnectivityWatcher");
1174 GRPC_CLOSURE_INIT(&my_closure_, WatchConnectivityStateLocked, this,
1175 grpc_combiner_scheduler(chand_->combiner_)),
1179 ChannelData::ExternalConnectivityWatcher::~ExternalConnectivityWatcher() {
1180 grpc_polling_entity_del_from_pollset_set(&pollent_,
1181 chand_->interested_parties_);
1182 GRPC_CHANNEL_STACK_UNREF(chand_->owning_stack_,
1183 "ExternalConnectivityWatcher");
1186 void ChannelData::ExternalConnectivityWatcher::OnWatchCompleteLocked(
1187 void* arg, grpc_error* error) {
1188 ExternalConnectivityWatcher* self =
1189 static_cast<ExternalConnectivityWatcher*>(arg);
1190 grpc_closure* on_complete = self->on_complete_;
1191 self->chand_->external_connectivity_watcher_list_.Remove(self);
1193 GRPC_CLOSURE_SCHED(on_complete, GRPC_ERROR_REF(error));
1196 void ChannelData::ExternalConnectivityWatcher::WatchConnectivityStateLocked(
1197 void* arg, grpc_error* ignored) {
1198 ExternalConnectivityWatcher* self =
1199 static_cast<ExternalConnectivityWatcher*>(arg);
1200 if (self->state_ == nullptr) {
1201 // Handle cancellation.
1202 GPR_ASSERT(self->watcher_timer_init_ == nullptr);
1203 ExternalConnectivityWatcher* found =
1204 self->chand_->external_connectivity_watcher_list_.Lookup(
1205 self->on_complete_);
1206 if (found != nullptr) {
1207 grpc_connectivity_state_notify_on_state_change(
1208 &found->chand_->state_tracker_, nullptr, &found->my_closure_);
1214 self->chand_->external_connectivity_watcher_list_.Add(self);
1215 // This assumes that the closure is scheduled on the ExecCtx scheduler
1216 // and that GRPC_CLOSURE_RUN would run the closure immediately.
1217 GRPC_CLOSURE_RUN(self->watcher_timer_init_, GRPC_ERROR_NONE);
1218 GRPC_CLOSURE_INIT(&self->my_closure_, OnWatchCompleteLocked, self,
1219 grpc_combiner_scheduler(self->chand_->combiner_));
1220 grpc_connectivity_state_notify_on_state_change(
1221 &self->chand_->state_tracker_, self->state_, &self->my_closure_);
1225 // ChannelData::ClientChannelControlHelper
1228 class ChannelData::ClientChannelControlHelper
1229 : public LoadBalancingPolicy::ChannelControlHelper {
1231 explicit ClientChannelControlHelper(ChannelData* chand) : chand_(chand) {
1232 GRPC_CHANNEL_STACK_REF(chand_->owning_stack_, "ClientChannelControlHelper");
1235 ~ClientChannelControlHelper() override {
1236 GRPC_CHANNEL_STACK_UNREF(chand_->owning_stack_,
1237 "ClientChannelControlHelper");
1240 RefCountedPtr<SubchannelInterface> CreateSubchannel(
1241 const grpc_channel_args& args) override {
1242 bool inhibit_health_checking = grpc_channel_arg_get_bool(
1243 grpc_channel_args_find(&args, GRPC_ARG_INHIBIT_HEALTH_CHECKING), false);
1244 UniquePtr<char> health_check_service_name;
1245 if (!inhibit_health_checking) {
1246 health_check_service_name.reset(
1247 gpr_strdup(chand_->health_check_service_name_.get()));
1249 static const char* args_to_remove[] = {
1250 GRPC_ARG_INHIBIT_HEALTH_CHECKING,
1251 GRPC_ARG_CHANNELZ_CHANNEL_NODE,
1253 grpc_arg arg = SubchannelPoolInterface::CreateChannelArg(
1254 chand_->subchannel_pool_.get());
1255 grpc_channel_args* new_args = grpc_channel_args_copy_and_add_and_remove(
1256 &args, args_to_remove, GPR_ARRAY_SIZE(args_to_remove), &arg, 1);
1257 Subchannel* subchannel =
1258 chand_->client_channel_factory_->CreateSubchannel(new_args);
1259 grpc_channel_args_destroy(new_args);
1260 if (subchannel == nullptr) return nullptr;
1261 return MakeRefCounted<SubchannelWrapper>(
1262 chand_, subchannel, std::move(health_check_service_name));
1266 grpc_connectivity_state state,
1267 UniquePtr<LoadBalancingPolicy::SubchannelPicker> picker) override {
1268 grpc_error* disconnect_error = chand_->disconnect_error();
1269 if (GRPC_TRACE_FLAG_ENABLED(grpc_client_channel_routing_trace)) {
1270 const char* extra = disconnect_error == GRPC_ERROR_NONE
1272 : " (ignoring -- channel shutting down)";
1273 gpr_log(GPR_INFO, "chand=%p: update: state=%s picker=%p%s", chand_,
1274 grpc_connectivity_state_name(state), picker.get(), extra);
1276 // Do update only if not shutting down.
1277 if (disconnect_error == GRPC_ERROR_NONE) {
1278 chand_->UpdateStateAndPickerLocked(state, "helper", std::move(picker));
1282 // No-op -- we should never get this from ResolvingLoadBalancingPolicy.
1283 void RequestReresolution() override {}
1285 void AddTraceEvent(TraceSeverity severity, StringView message) override {
1286 if (chand_->channelz_node_ != nullptr) {
1287 chand_->channelz_node_->AddTraceEvent(
1288 ConvertSeverityEnum(severity),
1289 grpc_slice_from_copied_buffer(message.data(), message.size()));
1294 static channelz::ChannelTrace::Severity ConvertSeverityEnum(
1295 TraceSeverity severity) {
1296 if (severity == TRACE_INFO) return channelz::ChannelTrace::Info;
1297 if (severity == TRACE_WARNING) return channelz::ChannelTrace::Warning;
1298 return channelz::ChannelTrace::Error;
1301 ChannelData* chand_;
1305 // ChannelData implementation
1308 grpc_error* ChannelData::Init(grpc_channel_element* elem,
1309 grpc_channel_element_args* args) {
1310 GPR_ASSERT(args->is_last);
1311 GPR_ASSERT(elem->filter == &grpc_client_channel_filter);
1312 grpc_error* error = GRPC_ERROR_NONE;
1313 new (elem->channel_data) ChannelData(args, &error);
1317 void ChannelData::Destroy(grpc_channel_element* elem) {
1318 ChannelData* chand = static_cast<ChannelData*>(elem->channel_data);
1319 chand->~ChannelData();
1322 bool GetEnableRetries(const grpc_channel_args* args) {
1323 return grpc_channel_arg_get_bool(
1324 grpc_channel_args_find(args, GRPC_ARG_ENABLE_RETRIES), true);
1327 size_t GetMaxPerRpcRetryBufferSize(const grpc_channel_args* args) {
1328 return static_cast<size_t>(grpc_channel_arg_get_integer(
1329 grpc_channel_args_find(args, GRPC_ARG_PER_RPC_RETRY_BUFFER_SIZE),
1330 {DEFAULT_PER_RPC_RETRY_BUFFER_SIZE, 0, INT_MAX}));
1333 RefCountedPtr<SubchannelPoolInterface> GetSubchannelPool(
1334 const grpc_channel_args* args) {
1335 const bool use_local_subchannel_pool = grpc_channel_arg_get_bool(
1336 grpc_channel_args_find(args, GRPC_ARG_USE_LOCAL_SUBCHANNEL_POOL), false);
1337 if (use_local_subchannel_pool) {
1338 return MakeRefCounted<LocalSubchannelPool>();
1340 return GlobalSubchannelPool::instance();
1343 channelz::ChannelNode* GetChannelzNode(const grpc_channel_args* args) {
1344 const grpc_arg* arg =
1345 grpc_channel_args_find(args, GRPC_ARG_CHANNELZ_CHANNEL_NODE);
1346 if (arg != nullptr && arg->type == GRPC_ARG_POINTER) {
1347 return static_cast<channelz::ChannelNode*>(arg->value.pointer.p);
1352 ChannelData::ChannelData(grpc_channel_element_args* args, grpc_error** error)
1353 : deadline_checking_enabled_(
1354 grpc_deadline_checking_enabled(args->channel_args)),
1355 enable_retries_(GetEnableRetries(args->channel_args)),
1356 per_rpc_retry_buffer_size_(
1357 GetMaxPerRpcRetryBufferSize(args->channel_args)),
1358 owning_stack_(args->channel_stack),
1359 client_channel_factory_(
1360 ClientChannelFactory::GetFromChannelArgs(args->channel_args)),
1361 channelz_node_(GetChannelzNode(args->channel_args)),
1362 combiner_(grpc_combiner_create()),
1363 interested_parties_(grpc_pollset_set_create()),
1364 subchannel_pool_(GetSubchannelPool(args->channel_args)),
1365 disconnect_error_(GRPC_ERROR_NONE) {
1366 if (GRPC_TRACE_FLAG_ENABLED(grpc_client_channel_routing_trace)) {
1367 gpr_log(GPR_INFO, "chand=%p: creating client_channel for channel stack %p",
1368 this, owning_stack_);
1370 // Initialize data members.
1371 grpc_connectivity_state_init(&state_tracker_, GRPC_CHANNEL_IDLE,
1373 gpr_mu_init(&info_mu_);
1374 // Start backup polling.
1375 grpc_client_channel_start_backup_polling(interested_parties_);
1376 // Check client channel factory.
1377 if (client_channel_factory_ == nullptr) {
1378 *error = GRPC_ERROR_CREATE_FROM_STATIC_STRING(
1379 "Missing client channel factory in args for client channel filter");
1382 // Get server name to resolve, using proxy mapper if needed.
1383 const char* server_uri = grpc_channel_arg_get_string(
1384 grpc_channel_args_find(args->channel_args, GRPC_ARG_SERVER_URI));
1385 if (server_uri == nullptr) {
1386 *error = GRPC_ERROR_CREATE_FROM_STATIC_STRING(
1387 "server URI channel arg missing or wrong type in client channel "
1391 // Get default service config
1392 const char* service_config_json = grpc_channel_arg_get_string(
1393 grpc_channel_args_find(args->channel_args, GRPC_ARG_SERVICE_CONFIG));
1394 if (service_config_json != nullptr) {
1395 *error = GRPC_ERROR_NONE;
1396 default_service_config_ = ServiceConfig::Create(service_config_json, error);
1397 if (*error != GRPC_ERROR_NONE) {
1398 default_service_config_.reset();
1402 grpc_uri* uri = grpc_uri_parse(server_uri, true);
1403 if (uri != nullptr && uri->path[0] != '\0') {
1405 gpr_strdup(uri->path[0] == '/' ? uri->path + 1 : uri->path));
1407 grpc_uri_destroy(uri);
1408 char* proxy_name = nullptr;
1409 grpc_channel_args* new_args = nullptr;
1410 grpc_proxy_mappers_map_name(server_uri, args->channel_args, &proxy_name,
1412 target_uri_.reset(proxy_name != nullptr ? proxy_name
1413 : gpr_strdup(server_uri));
1414 channel_args_ = new_args != nullptr
1416 : grpc_channel_args_copy(args->channel_args);
1417 if (!ResolverRegistry::IsValidTarget(target_uri_.get())) {
1419 GRPC_ERROR_CREATE_FROM_STATIC_STRING("the target uri is not valid.");
1422 *error = GRPC_ERROR_NONE;
1425 ChannelData::~ChannelData() {
1426 if (GRPC_TRACE_FLAG_ENABLED(grpc_client_channel_routing_trace)) {
1427 gpr_log(GPR_INFO, "chand=%p: destroying channel", this);
1429 DestroyResolvingLoadBalancingPolicyLocked();
1430 grpc_channel_args_destroy(channel_args_);
1431 // Stop backup polling.
1432 grpc_client_channel_stop_backup_polling(interested_parties_);
1433 grpc_pollset_set_destroy(interested_parties_);
1434 GRPC_COMBINER_UNREF(combiner_, "client_channel");
1435 GRPC_ERROR_UNREF(disconnect_error_.Load(MemoryOrder::RELAXED));
1436 grpc_connectivity_state_destroy(&state_tracker_);
1437 gpr_mu_destroy(&info_mu_);
1440 void ChannelData::UpdateStateAndPickerLocked(
1441 grpc_connectivity_state state, const char* reason,
1442 UniquePtr<LoadBalancingPolicy::SubchannelPicker> picker) {
1443 // Clean the control plane when entering IDLE.
1444 if (picker_ == nullptr) {
1445 health_check_service_name_.reset();
1446 saved_service_config_.reset();
1447 received_first_resolver_result_ = false;
1449 // Update connectivity state.
1450 grpc_connectivity_state_set(&state_tracker_, state, reason);
1451 if (channelz_node_ != nullptr) {
1452 channelz_node_->SetConnectivityState(state);
1453 channelz_node_->AddTraceEvent(
1454 channelz::ChannelTrace::Severity::Info,
1455 grpc_slice_from_static_string(
1456 channelz::ChannelNode::GetChannelConnectivityStateChangeString(
1459 // Grab data plane lock to do subchannel updates and update the picker.
1461 // Note that we want to minimize the work done while holding the data
1462 // plane lock, to keep the critical section small. So, for all of the
1463 // objects that we might wind up unreffing here, we actually hold onto
1464 // the refs until after we release the lock, and then unref them at
1465 // that point. This includes the following:
1466 // - refs to subchannel wrappers in the keys of pending_subchannel_updates_
1467 // - ref stored in retry_throttle_data_
1468 // - ref stored in service_config_
1469 // - ownership of the existing picker in picker_
1470 RefCountedPtr<ServerRetryThrottleData> retry_throttle_data_to_unref;
1471 RefCountedPtr<ServiceConfig> service_config_to_unref;
1473 MutexLock lock(&data_plane_mu_);
1474 // Handle subchannel updates.
1475 for (auto& p : pending_subchannel_updates_) {
1476 if (GRPC_TRACE_FLAG_ENABLED(grpc_client_channel_routing_trace)) {
1478 "chand=%p: updating subchannel wrapper %p data plane "
1479 "connected_subchannel to %p",
1480 this, p.first.get(), p.second.get());
1482 // Note: We do not remove the entry from pending_subchannel_updates_
1483 // here, since this would unref the subchannel wrapper; instead,
1484 // we wait until we've released the lock to clear the map.
1485 p.first->set_connected_subchannel_in_data_plane(std::move(p.second));
1487 // Swap out the picker.
1488 // Note: Original value will be destroyed after the lock is released.
1489 picker_.swap(picker);
1490 // Clean the data plane if the updated picker is nullptr.
1491 if (picker_ == nullptr) {
1492 received_service_config_data_ = false;
1493 // Note: We save the objects to unref until after the lock is released.
1494 retry_throttle_data_to_unref = std::move(retry_throttle_data_);
1495 service_config_to_unref = std::move(service_config_);
1497 // Re-process queued picks.
1498 for (QueuedPick* pick = queued_picks_; pick != nullptr; pick = pick->next) {
1499 grpc_call_element* elem = pick->elem;
1500 CallData* calld = static_cast<CallData*>(elem->call_data);
1501 grpc_error* error = GRPC_ERROR_NONE;
1502 if (calld->PickSubchannelLocked(elem, &error)) {
1503 calld->AsyncPickDone(elem, error);
1507 // Clear the pending update map after releasing the lock, to keep the
1508 // critical section small.
1509 pending_subchannel_updates_.clear();
1512 void ChannelData::UpdateServiceConfigLocked(
1513 RefCountedPtr<ServerRetryThrottleData> retry_throttle_data,
1514 RefCountedPtr<ServiceConfig> service_config) {
1515 // Grab data plane lock to update service config.
1517 // We defer unreffing the old values (and deallocating memory) until
1518 // after releasing the lock to keep the critical section small.
1520 MutexLock lock(&data_plane_mu_);
1521 // Update service config.
1522 received_service_config_data_ = true;
1523 // Old values will be unreffed after lock is released.
1524 retry_throttle_data_.swap(retry_throttle_data);
1525 service_config_.swap(service_config);
1526 // Apply service config to queued picks.
1527 for (QueuedPick* pick = queued_picks_; pick != nullptr; pick = pick->next) {
1528 CallData* calld = static_cast<CallData*>(pick->elem->call_data);
1529 calld->MaybeApplyServiceConfigToCallLocked(pick->elem);
1532 // Old values will be unreffed after lock is released when they go out
1536 void ChannelData::CreateResolvingLoadBalancingPolicyLocked() {
1537 // Instantiate resolving LB policy.
1538 LoadBalancingPolicy::Args lb_args;
1539 lb_args.combiner = combiner_;
1540 lb_args.channel_control_helper =
1541 UniquePtr<LoadBalancingPolicy::ChannelControlHelper>(
1542 New<ClientChannelControlHelper>(this));
1543 lb_args.args = channel_args_;
1544 UniquePtr<char> target_uri(gpr_strdup(target_uri_.get()));
1545 resolving_lb_policy_.reset(New<ResolvingLoadBalancingPolicy>(
1546 std::move(lb_args), &grpc_client_channel_routing_trace,
1547 std::move(target_uri), ProcessResolverResultLocked, this));
1548 grpc_pollset_set_add_pollset_set(resolving_lb_policy_->interested_parties(),
1549 interested_parties_);
1550 if (GRPC_TRACE_FLAG_ENABLED(grpc_client_channel_routing_trace)) {
1551 gpr_log(GPR_INFO, "chand=%p: created resolving_lb_policy=%p", this,
1552 resolving_lb_policy_.get());
1556 void ChannelData::DestroyResolvingLoadBalancingPolicyLocked() {
1557 if (resolving_lb_policy_ != nullptr) {
1558 grpc_pollset_set_del_pollset_set(resolving_lb_policy_->interested_parties(),
1559 interested_parties_);
1560 resolving_lb_policy_.reset();
1564 void ChannelData::ProcessLbPolicy(
1565 const Resolver::Result& resolver_result,
1566 const internal::ClientChannelGlobalParsedConfig* parsed_service_config,
1567 UniquePtr<char>* lb_policy_name,
1568 RefCountedPtr<LoadBalancingPolicy::Config>* lb_policy_config) {
1569 // Prefer the LB policy name found in the service config.
1570 if (parsed_service_config != nullptr &&
1571 parsed_service_config->parsed_lb_config() != nullptr) {
1572 lb_policy_name->reset(
1573 gpr_strdup(parsed_service_config->parsed_lb_config()->name()));
1574 *lb_policy_config = parsed_service_config->parsed_lb_config();
1577 const char* local_policy_name = nullptr;
1578 if (parsed_service_config != nullptr &&
1579 parsed_service_config->parsed_deprecated_lb_policy() != nullptr) {
1580 local_policy_name = parsed_service_config->parsed_deprecated_lb_policy();
1582 const grpc_arg* channel_arg =
1583 grpc_channel_args_find(resolver_result.args, GRPC_ARG_LB_POLICY_NAME);
1584 local_policy_name = grpc_channel_arg_get_string(channel_arg);
1586 // Special case: If at least one balancer address is present, we use
1587 // the grpclb policy, regardless of what the resolver has returned.
1588 bool found_balancer_address = false;
1589 for (size_t i = 0; i < resolver_result.addresses.size(); ++i) {
1590 const ServerAddress& address = resolver_result.addresses[i];
1591 if (address.IsBalancer()) {
1592 found_balancer_address = true;
1596 if (found_balancer_address) {
1597 if (local_policy_name != nullptr &&
1598 strcmp(local_policy_name, "grpclb") != 0) {
1600 "resolver requested LB policy %s but provided at least one "
1601 "balancer address -- forcing use of grpclb LB policy",
1604 local_policy_name = "grpclb";
1606 // Use pick_first if nothing was specified and we didn't select grpclb
1608 lb_policy_name->reset(gpr_strdup(
1609 local_policy_name == nullptr ? "pick_first" : local_policy_name));
1612 // Synchronous callback from ResolvingLoadBalancingPolicy to process a
1613 // resolver result update.
1614 bool ChannelData::ProcessResolverResultLocked(
1615 void* arg, const Resolver::Result& result, const char** lb_policy_name,
1616 RefCountedPtr<LoadBalancingPolicy::Config>* lb_policy_config,
1617 grpc_error** service_config_error) {
1618 ChannelData* chand = static_cast<ChannelData*>(arg);
1619 RefCountedPtr<ServiceConfig> service_config;
1620 // If resolver did not return a service config or returned an invalid service
1621 // config, we need a fallback service config.
1622 if (result.service_config_error != GRPC_ERROR_NONE) {
1623 // If the service config was invalid, then fallback to the saved service
1624 // config. If there is no saved config either, use the default service
1626 if (chand->saved_service_config_ != nullptr) {
1627 service_config = chand->saved_service_config_;
1628 if (GRPC_TRACE_FLAG_ENABLED(grpc_client_channel_routing_trace)) {
1630 "chand=%p: resolver returned invalid service config. "
1631 "Continuing to use previous service config.",
1634 } else if (chand->default_service_config_ != nullptr) {
1635 if (GRPC_TRACE_FLAG_ENABLED(grpc_client_channel_routing_trace)) {
1637 "chand=%p: resolver returned invalid service config. Using "
1638 "default service config provided by client API.",
1641 service_config = chand->default_service_config_;
1643 } else if (result.service_config == nullptr) {
1644 if (chand->default_service_config_ != nullptr) {
1645 if (GRPC_TRACE_FLAG_ENABLED(grpc_client_channel_routing_trace)) {
1647 "chand=%p: resolver returned no service config. Using default "
1648 "service config provided by client API.",
1651 service_config = chand->default_service_config_;
1654 service_config = result.service_config;
1656 *service_config_error = GRPC_ERROR_REF(result.service_config_error);
1657 if (service_config == nullptr &&
1658 result.service_config_error != GRPC_ERROR_NONE) {
1661 // Process service config.
1662 UniquePtr<char> service_config_json;
1663 const internal::ClientChannelGlobalParsedConfig* parsed_service_config =
1665 if (service_config != nullptr) {
1666 parsed_service_config =
1667 static_cast<const internal::ClientChannelGlobalParsedConfig*>(
1668 service_config->GetGlobalParsedConfig(
1669 internal::ClientChannelServiceConfigParser::ParserIndex()));
1671 // Check if the config has changed.
1672 const bool service_config_changed =
1673 ((service_config == nullptr) !=
1674 (chand->saved_service_config_ == nullptr)) ||
1675 (service_config != nullptr &&
1676 strcmp(service_config->service_config_json(),
1677 chand->saved_service_config_->service_config_json()) != 0);
1678 if (service_config_changed) {
1679 service_config_json.reset(gpr_strdup(
1680 service_config != nullptr ? service_config->service_config_json()
1682 if (GRPC_TRACE_FLAG_ENABLED(grpc_client_channel_routing_trace)) {
1684 "chand=%p: resolver returned updated service config: \"%s\"",
1685 chand, service_config_json.get());
1687 // Save health check service name.
1688 if (service_config != nullptr) {
1689 chand->health_check_service_name_.reset(
1690 gpr_strdup(parsed_service_config->health_check_service_name()));
1692 chand->health_check_service_name_.reset();
1694 // Update health check service name used by existing subchannel wrappers.
1695 for (const auto& p : chand->subchannel_wrappers_) {
1696 p.first->UpdateHealthCheckServiceName(
1697 UniquePtr<char>(gpr_strdup(chand->health_check_service_name_.get())));
1699 // Save service config.
1700 chand->saved_service_config_ = std::move(service_config);
1702 // We want to set the service config at least once. This should not really be
1703 // needed, but we are doing it as a defensive approach. This can be removed,
1704 // if we feel it is unnecessary.
1705 if (service_config_changed || !chand->received_first_resolver_result_) {
1706 chand->received_first_resolver_result_ = true;
1707 RefCountedPtr<ServerRetryThrottleData> retry_throttle_data;
1708 if (parsed_service_config != nullptr) {
1709 Optional<internal::ClientChannelGlobalParsedConfig::RetryThrottling>
1710 retry_throttle_config = parsed_service_config->retry_throttling();
1711 if (retry_throttle_config.has_value()) {
1712 retry_throttle_data =
1713 internal::ServerRetryThrottleMap::GetDataForServer(
1714 chand->server_name_.get(),
1715 retry_throttle_config.value().max_milli_tokens,
1716 retry_throttle_config.value().milli_token_ratio);
1719 chand->UpdateServiceConfigLocked(std::move(retry_throttle_data),
1720 chand->saved_service_config_);
1722 UniquePtr<char> processed_lb_policy_name;
1723 chand->ProcessLbPolicy(result, parsed_service_config,
1724 &processed_lb_policy_name, lb_policy_config);
1725 // Swap out the data used by GetChannelInfo().
1727 MutexLock lock(&chand->info_mu_);
1728 chand->info_lb_policy_name_ = std::move(processed_lb_policy_name);
1729 if (service_config_json != nullptr) {
1730 chand->info_service_config_json_ = std::move(service_config_json);
1734 *lb_policy_name = chand->info_lb_policy_name_.get();
1735 return service_config_changed;
1738 grpc_error* ChannelData::DoPingLocked(grpc_transport_op* op) {
1739 if (grpc_connectivity_state_check(&state_tracker_) != GRPC_CHANNEL_READY) {
1740 return GRPC_ERROR_CREATE_FROM_STATIC_STRING("channel not connected");
1742 LoadBalancingPolicy::PickResult result =
1743 picker_->Pick(LoadBalancingPolicy::PickArgs());
1744 ConnectedSubchannel* connected_subchannel = nullptr;
1745 if (result.subchannel != nullptr) {
1746 SubchannelWrapper* subchannel =
1747 static_cast<SubchannelWrapper*>(result.subchannel.get());
1748 connected_subchannel = subchannel->connected_subchannel();
1750 if (connected_subchannel != nullptr) {
1751 connected_subchannel->Ping(op->send_ping.on_initiate, op->send_ping.on_ack);
1753 if (result.error == GRPC_ERROR_NONE) {
1754 result.error = GRPC_ERROR_CREATE_FROM_STATIC_STRING(
1755 "LB policy dropped call on ping");
1758 return result.error;
1761 void ChannelData::StartTransportOpLocked(void* arg, grpc_error* ignored) {
1762 grpc_transport_op* op = static_cast<grpc_transport_op*>(arg);
1763 grpc_channel_element* elem =
1764 static_cast<grpc_channel_element*>(op->handler_private.extra_arg);
1765 ChannelData* chand = static_cast<ChannelData*>(elem->channel_data);
1766 // Connectivity watch.
1767 if (op->on_connectivity_state_change != nullptr) {
1768 grpc_connectivity_state_notify_on_state_change(
1769 &chand->state_tracker_, op->connectivity_state,
1770 op->on_connectivity_state_change);
1771 op->on_connectivity_state_change = nullptr;
1772 op->connectivity_state = nullptr;
1775 if (op->send_ping.on_initiate != nullptr || op->send_ping.on_ack != nullptr) {
1776 grpc_error* error = chand->DoPingLocked(op);
1777 if (error != GRPC_ERROR_NONE) {
1778 GRPC_CLOSURE_SCHED(op->send_ping.on_initiate, GRPC_ERROR_REF(error));
1779 GRPC_CLOSURE_SCHED(op->send_ping.on_ack, error);
1781 op->bind_pollset = nullptr;
1782 op->send_ping.on_initiate = nullptr;
1783 op->send_ping.on_ack = nullptr;
1786 if (op->reset_connect_backoff) {
1787 if (chand->resolving_lb_policy_ != nullptr) {
1788 chand->resolving_lb_policy_->ResetBackoffLocked();
1791 // Disconnect or enter IDLE.
1792 if (op->disconnect_with_error != GRPC_ERROR_NONE) {
1793 if (GRPC_TRACE_FLAG_ENABLED(grpc_client_channel_call_trace)) {
1794 gpr_log(GPR_INFO, "chand=%p: disconnect_with_error: %s", chand,
1795 grpc_error_string(op->disconnect_with_error));
1797 chand->DestroyResolvingLoadBalancingPolicyLocked();
1799 if (grpc_error_get_int(op->disconnect_with_error,
1800 GRPC_ERROR_INT_CHANNEL_CONNECTIVITY_STATE, &value) &&
1801 static_cast<grpc_connectivity_state>(value) == GRPC_CHANNEL_IDLE) {
1802 if (chand->disconnect_error() == GRPC_ERROR_NONE) {
1803 // Enter IDLE state.
1804 chand->UpdateStateAndPickerLocked(GRPC_CHANNEL_IDLE,
1805 "channel entering IDLE", nullptr);
1807 GRPC_ERROR_UNREF(op->disconnect_with_error);
1810 GPR_ASSERT(chand->disconnect_error_.Load(MemoryOrder::RELAXED) ==
1812 chand->disconnect_error_.Store(op->disconnect_with_error,
1813 MemoryOrder::RELEASE);
1814 chand->UpdateStateAndPickerLocked(
1815 GRPC_CHANNEL_SHUTDOWN, "shutdown from API",
1816 UniquePtr<LoadBalancingPolicy::SubchannelPicker>(
1817 New<LoadBalancingPolicy::TransientFailurePicker>(
1818 GRPC_ERROR_REF(op->disconnect_with_error))));
1821 GRPC_CHANNEL_STACK_UNREF(chand->owning_stack_, "start_transport_op");
1822 GRPC_CLOSURE_SCHED(op->on_consumed, GRPC_ERROR_NONE);
1825 void ChannelData::StartTransportOp(grpc_channel_element* elem,
1826 grpc_transport_op* op) {
1827 ChannelData* chand = static_cast<ChannelData*>(elem->channel_data);
1828 GPR_ASSERT(op->set_accept_stream == false);
1829 // Handle bind_pollset.
1830 if (op->bind_pollset != nullptr) {
1831 grpc_pollset_set_add_pollset(chand->interested_parties_, op->bind_pollset);
1833 // Pop into control plane combiner for remaining ops.
1834 op->handler_private.extra_arg = elem;
1835 GRPC_CHANNEL_STACK_REF(chand->owning_stack_, "start_transport_op");
1837 GRPC_CLOSURE_INIT(&op->handler_private.closure,
1838 ChannelData::StartTransportOpLocked, op,
1839 grpc_combiner_scheduler(chand->combiner_)),
1843 void ChannelData::GetChannelInfo(grpc_channel_element* elem,
1844 const grpc_channel_info* info) {
1845 ChannelData* chand = static_cast<ChannelData*>(elem->channel_data);
1846 MutexLock lock(&chand->info_mu_);
1847 if (info->lb_policy_name != nullptr) {
1848 *info->lb_policy_name = gpr_strdup(chand->info_lb_policy_name_.get());
1850 if (info->service_config_json != nullptr) {
1851 *info->service_config_json =
1852 gpr_strdup(chand->info_service_config_json_.get());
1856 void ChannelData::AddQueuedPick(QueuedPick* pick,
1857 grpc_polling_entity* pollent) {
1858 // Add call to queued picks list.
1859 pick->next = queued_picks_;
1860 queued_picks_ = pick;
1861 // Add call's pollent to channel's interested_parties, so that I/O
1862 // can be done under the call's CQ.
1863 grpc_polling_entity_add_to_pollset_set(pollent, interested_parties_);
1866 void ChannelData::RemoveQueuedPick(QueuedPick* to_remove,
1867 grpc_polling_entity* pollent) {
1868 // Remove call's pollent from channel's interested_parties.
1869 grpc_polling_entity_del_from_pollset_set(pollent, interested_parties_);
1870 // Remove from queued picks list.
1871 for (QueuedPick** pick = &queued_picks_; *pick != nullptr;
1872 pick = &(*pick)->next) {
1873 if (*pick == to_remove) {
1874 *pick = to_remove->next;
1880 RefCountedPtr<ConnectedSubchannel>
1881 ChannelData::GetConnectedSubchannelInDataPlane(
1882 SubchannelInterface* subchannel) const {
1883 SubchannelWrapper* subchannel_wrapper =
1884 static_cast<SubchannelWrapper*>(subchannel);
1885 ConnectedSubchannel* connected_subchannel =
1886 subchannel_wrapper->connected_subchannel_in_data_plane();
1887 if (connected_subchannel == nullptr) return nullptr;
1888 return connected_subchannel->Ref();
1891 void ChannelData::TryToConnectLocked(void* arg, grpc_error* error_ignored) {
1892 auto* chand = static_cast<ChannelData*>(arg);
1893 if (chand->resolving_lb_policy_ != nullptr) {
1894 chand->resolving_lb_policy_->ExitIdleLocked();
1896 chand->CreateResolvingLoadBalancingPolicyLocked();
1898 GRPC_CHANNEL_STACK_UNREF(chand->owning_stack_, "TryToConnect");
1901 grpc_connectivity_state ChannelData::CheckConnectivityState(
1902 bool try_to_connect) {
1903 grpc_connectivity_state out = grpc_connectivity_state_check(&state_tracker_);
1904 if (out == GRPC_CHANNEL_IDLE && try_to_connect) {
1905 GRPC_CHANNEL_STACK_REF(owning_stack_, "TryToConnect");
1906 GRPC_CLOSURE_SCHED(GRPC_CLOSURE_CREATE(TryToConnectLocked, this,
1907 grpc_combiner_scheduler(combiner_)),
1914 // CallData implementation
1919 // In order to support retries, we act as a proxy for stream op batches.
1920 // When we get a batch from the surface, we add it to our list of pending
1921 // batches, and we then use those batches to construct separate "child"
1922 // batches to be started on the subchannel call. When the child batches
1923 // return, we then decide which pending batches have been completed and
1924 // schedule their callbacks accordingly. If a subchannel call fails and
1925 // we want to retry it, we do a new pick and start again, constructing
1926 // new "child" batches for the new subchannel call.
1928 // Note that retries are committed when receiving data from the server
1929 // (except for Trailers-Only responses). However, there may be many
1930 // send ops started before receiving any data, so we may have already
1931 // completed some number of send ops (and returned the completions up to
1932 // the surface) by the time we realize that we need to retry. To deal
1933 // with this, we cache data for send ops, so that we can replay them on a
1934 // different subchannel call even after we have completed the original
1937 // There are two sets of data to maintain:
1938 // - In call_data (in the parent channel), we maintain a list of pending
1939 // ops and cached data for send ops.
1940 // - In the subchannel call, we maintain state to indicate what ops have
1941 // already been sent down to that call.
1943 // When constructing the "child" batches, we compare those two sets of
1944 // data to see which batches need to be sent to the subchannel call.
1946 // TODO(roth): In subsequent PRs:
1947 // - add support for transparent retries (including initial metadata)
1948 // - figure out how to record stats in census for retries
1949 // (census filter is on top of this one)
1950 // - add census stats for retries
1952 CallData::CallData(grpc_call_element* elem, const ChannelData& chand,
1953 const grpc_call_element_args& args)
1954 : deadline_state_(elem, args.call_stack, args.call_combiner,
1955 GPR_LIKELY(chand.deadline_checking_enabled())
1957 : GRPC_MILLIS_INF_FUTURE),
1958 path_(grpc_slice_ref_internal(args.path)),
1959 call_start_time_(args.start_time),
1960 deadline_(args.deadline),
1962 owning_call_(args.call_stack),
1963 call_combiner_(args.call_combiner),
1964 call_context_(args.context),
1965 lb_call_state_(this),
1966 pending_send_initial_metadata_(false),
1967 pending_send_message_(false),
1968 pending_send_trailing_metadata_(false),
1969 enable_retries_(chand.enable_retries()),
1970 retry_committed_(false),
1971 last_attempt_got_server_pushback_(false) {}
1973 CallData::~CallData() {
1974 grpc_slice_unref_internal(path_);
1975 GRPC_ERROR_UNREF(cancel_error_);
1976 if (backend_metric_data_ != nullptr) {
1977 backend_metric_data_
1978 ->LoadBalancingPolicy::BackendMetricData::~BackendMetricData();
1980 // Make sure there are no remaining pending batches.
1981 for (size_t i = 0; i < GPR_ARRAY_SIZE(pending_batches_); ++i) {
1982 GPR_ASSERT(pending_batches_[i].batch == nullptr);
1986 grpc_error* CallData::Init(grpc_call_element* elem,
1987 const grpc_call_element_args* args) {
1988 ChannelData* chand = static_cast<ChannelData*>(elem->channel_data);
1989 new (elem->call_data) CallData(elem, *chand, *args);
1990 return GRPC_ERROR_NONE;
1993 void CallData::Destroy(grpc_call_element* elem,
1994 const grpc_call_final_info* final_info,
1995 grpc_closure* then_schedule_closure) {
1996 CallData* calld = static_cast<CallData*>(elem->call_data);
1997 if (GPR_LIKELY(calld->subchannel_call_ != nullptr)) {
1998 calld->subchannel_call_->SetAfterCallStackDestroy(then_schedule_closure);
1999 then_schedule_closure = nullptr;
2002 GRPC_CLOSURE_SCHED(then_schedule_closure, GRPC_ERROR_NONE);
2005 void CallData::StartTransportStreamOpBatch(
2006 grpc_call_element* elem, grpc_transport_stream_op_batch* batch) {
2007 GPR_TIMER_SCOPE("cc_start_transport_stream_op_batch", 0);
2008 CallData* calld = static_cast<CallData*>(elem->call_data);
2009 ChannelData* chand = static_cast<ChannelData*>(elem->channel_data);
2010 if (GPR_LIKELY(chand->deadline_checking_enabled())) {
2011 grpc_deadline_state_client_start_transport_stream_op_batch(elem, batch);
2013 // If we've previously been cancelled, immediately fail any new batches.
2014 if (GPR_UNLIKELY(calld->cancel_error_ != GRPC_ERROR_NONE)) {
2015 if (GRPC_TRACE_FLAG_ENABLED(grpc_client_channel_call_trace)) {
2016 gpr_log(GPR_INFO, "chand=%p calld=%p: failing batch with error: %s",
2017 chand, calld, grpc_error_string(calld->cancel_error_));
2019 // Note: This will release the call combiner.
2020 grpc_transport_stream_op_batch_finish_with_failure(
2021 batch, GRPC_ERROR_REF(calld->cancel_error_), calld->call_combiner_);
2024 // Handle cancellation.
2025 if (GPR_UNLIKELY(batch->cancel_stream)) {
2026 // Stash a copy of cancel_error in our call data, so that we can use
2027 // it for subsequent operations. This ensures that if the call is
2028 // cancelled before any batches are passed down (e.g., if the deadline
2029 // is in the past when the call starts), we can return the right
2030 // error to the caller when the first batch does get passed down.
2031 GRPC_ERROR_UNREF(calld->cancel_error_);
2032 calld->cancel_error_ =
2033 GRPC_ERROR_REF(batch->payload->cancel_stream.cancel_error);
2034 if (GRPC_TRACE_FLAG_ENABLED(grpc_client_channel_call_trace)) {
2035 gpr_log(GPR_INFO, "chand=%p calld=%p: recording cancel_error=%s", chand,
2036 calld, grpc_error_string(calld->cancel_error_));
2038 // If we do not have a subchannel call (i.e., a pick has not yet
2039 // been started), fail all pending batches. Otherwise, send the
2040 // cancellation down to the subchannel call.
2041 if (calld->subchannel_call_ == nullptr) {
2042 // TODO(roth): If there is a pending retry callback, do we need to
2044 calld->PendingBatchesFail(elem, GRPC_ERROR_REF(calld->cancel_error_),
2045 NoYieldCallCombiner);
2046 // Note: This will release the call combiner.
2047 grpc_transport_stream_op_batch_finish_with_failure(
2048 batch, GRPC_ERROR_REF(calld->cancel_error_), calld->call_combiner_);
2050 // Note: This will release the call combiner.
2051 calld->subchannel_call_->StartTransportStreamOpBatch(batch);
2055 // Add the batch to the pending list.
2056 calld->PendingBatchesAdd(elem, batch);
2057 // Check if we've already gotten a subchannel call.
2058 // Note that once we have picked a subchannel, we do not need to acquire
2059 // the channel's data plane mutex, which is more efficient (especially for
2060 // streaming calls).
2061 if (calld->subchannel_call_ != nullptr) {
2062 if (GRPC_TRACE_FLAG_ENABLED(grpc_client_channel_call_trace)) {
2064 "chand=%p calld=%p: starting batch on subchannel_call=%p", chand,
2065 calld, calld->subchannel_call_.get());
2067 calld->PendingBatchesResume(elem);
2070 // We do not yet have a subchannel call.
2071 // For batches containing a send_initial_metadata op, acquire the
2072 // channel's data plane mutex to pick a subchannel.
2073 if (GPR_LIKELY(batch->send_initial_metadata)) {
2074 if (GRPC_TRACE_FLAG_ENABLED(grpc_client_channel_call_trace)) {
2076 "chand=%p calld=%p: grabbing data plane mutex to perform pick",
2079 PickSubchannel(elem, GRPC_ERROR_NONE);
2081 // For all other batches, release the call combiner.
2082 if (GRPC_TRACE_FLAG_ENABLED(grpc_client_channel_call_trace)) {
2084 "chand=%p calld=%p: saved batch, yielding call combiner", chand,
2087 GRPC_CALL_COMBINER_STOP(calld->call_combiner_,
2088 "batch does not include send_initial_metadata");
2092 void CallData::SetPollent(grpc_call_element* elem,
2093 grpc_polling_entity* pollent) {
2094 CallData* calld = static_cast<CallData*>(elem->call_data);
2095 calld->pollent_ = pollent;
2099 // send op data caching
2102 void CallData::MaybeCacheSendOpsForBatch(PendingBatch* pending) {
2103 if (pending->send_ops_cached) return;
2104 pending->send_ops_cached = true;
2105 grpc_transport_stream_op_batch* batch = pending->batch;
2106 // Save a copy of metadata for send_initial_metadata ops.
2107 if (batch->send_initial_metadata) {
2108 seen_send_initial_metadata_ = true;
2109 GPR_ASSERT(send_initial_metadata_storage_ == nullptr);
2110 grpc_metadata_batch* send_initial_metadata =
2111 batch->payload->send_initial_metadata.send_initial_metadata;
2112 send_initial_metadata_storage_ = (grpc_linked_mdelem*)arena_->Alloc(
2113 sizeof(grpc_linked_mdelem) * send_initial_metadata->list.count);
2114 grpc_metadata_batch_copy(send_initial_metadata, &send_initial_metadata_,
2115 send_initial_metadata_storage_);
2116 send_initial_metadata_flags_ =
2117 batch->payload->send_initial_metadata.send_initial_metadata_flags;
2118 peer_string_ = batch->payload->send_initial_metadata.peer_string;
2120 // Set up cache for send_message ops.
2121 if (batch->send_message) {
2122 ByteStreamCache* cache = arena_->New<ByteStreamCache>(
2123 std::move(batch->payload->send_message.send_message));
2124 send_messages_.push_back(cache);
2126 // Save metadata batch for send_trailing_metadata ops.
2127 if (batch->send_trailing_metadata) {
2128 seen_send_trailing_metadata_ = true;
2129 GPR_ASSERT(send_trailing_metadata_storage_ == nullptr);
2130 grpc_metadata_batch* send_trailing_metadata =
2131 batch->payload->send_trailing_metadata.send_trailing_metadata;
2132 send_trailing_metadata_storage_ = (grpc_linked_mdelem*)arena_->Alloc(
2133 sizeof(grpc_linked_mdelem) * send_trailing_metadata->list.count);
2134 grpc_metadata_batch_copy(send_trailing_metadata, &send_trailing_metadata_,
2135 send_trailing_metadata_storage_);
2139 void CallData::FreeCachedSendInitialMetadata(ChannelData* chand) {
2140 if (GRPC_TRACE_FLAG_ENABLED(grpc_client_channel_call_trace)) {
2142 "chand=%p calld=%p: destroying calld->send_initial_metadata", chand,
2145 grpc_metadata_batch_destroy(&send_initial_metadata_);
2148 void CallData::FreeCachedSendMessage(ChannelData* chand, size_t idx) {
2149 if (GRPC_TRACE_FLAG_ENABLED(grpc_client_channel_call_trace)) {
2151 "chand=%p calld=%p: destroying calld->send_messages[%" PRIuPTR "]",
2154 send_messages_[idx]->Destroy();
2157 void CallData::FreeCachedSendTrailingMetadata(ChannelData* chand) {
2158 if (GRPC_TRACE_FLAG_ENABLED(grpc_client_channel_call_trace)) {
2160 "chand=%p calld=%p: destroying calld->send_trailing_metadata",
2163 grpc_metadata_batch_destroy(&send_trailing_metadata_);
2166 void CallData::FreeCachedSendOpDataAfterCommit(
2167 grpc_call_element* elem, SubchannelCallRetryState* retry_state) {
2168 ChannelData* chand = static_cast<ChannelData*>(elem->channel_data);
2169 if (retry_state->completed_send_initial_metadata) {
2170 FreeCachedSendInitialMetadata(chand);
2172 for (size_t i = 0; i < retry_state->completed_send_message_count; ++i) {
2173 FreeCachedSendMessage(chand, i);
2175 if (retry_state->completed_send_trailing_metadata) {
2176 FreeCachedSendTrailingMetadata(chand);
2180 void CallData::FreeCachedSendOpDataForCompletedBatch(
2181 grpc_call_element* elem, SubchannelCallBatchData* batch_data,
2182 SubchannelCallRetryState* retry_state) {
2183 ChannelData* chand = static_cast<ChannelData*>(elem->channel_data);
2184 if (batch_data->batch.send_initial_metadata) {
2185 FreeCachedSendInitialMetadata(chand);
2187 if (batch_data->batch.send_message) {
2188 FreeCachedSendMessage(chand, retry_state->completed_send_message_count - 1);
2190 if (batch_data->batch.send_trailing_metadata) {
2191 FreeCachedSendTrailingMetadata(chand);
2196 // LB recv_trailing_metadata_ready handling
2199 void CallData::RecvTrailingMetadataReadyForLoadBalancingPolicy(
2200 void* arg, grpc_error* error) {
2201 CallData* calld = static_cast<CallData*>(arg);
2202 // Invoke callback to LB policy.
2203 Metadata trailing_metadata(calld, calld->recv_trailing_metadata_);
2204 calld->lb_recv_trailing_metadata_ready_(
2205 calld->lb_recv_trailing_metadata_ready_user_data_, error,
2206 &trailing_metadata, &calld->lb_call_state_);
2207 // Chain to original callback.
2208 GRPC_CLOSURE_RUN(calld->original_recv_trailing_metadata_ready_,
2209 GRPC_ERROR_REF(error));
2212 void CallData::MaybeInjectRecvTrailingMetadataReadyForLoadBalancingPolicy(
2213 grpc_transport_stream_op_batch* batch) {
2214 if (lb_recv_trailing_metadata_ready_ != nullptr) {
2215 recv_trailing_metadata_ =
2216 batch->payload->recv_trailing_metadata.recv_trailing_metadata;
2217 original_recv_trailing_metadata_ready_ =
2218 batch->payload->recv_trailing_metadata.recv_trailing_metadata_ready;
2219 GRPC_CLOSURE_INIT(&recv_trailing_metadata_ready_,
2220 RecvTrailingMetadataReadyForLoadBalancingPolicy, this,
2221 grpc_schedule_on_exec_ctx);
2222 batch->payload->recv_trailing_metadata.recv_trailing_metadata_ready =
2223 &recv_trailing_metadata_ready_;
2228 // pending_batches management
2231 size_t CallData::GetBatchIndex(grpc_transport_stream_op_batch* batch) {
2232 // Note: It is important the send_initial_metadata be the first entry
2233 // here, since the code in pick_subchannel_locked() assumes it will be.
2234 if (batch->send_initial_metadata) return 0;
2235 if (batch->send_message) return 1;
2236 if (batch->send_trailing_metadata) return 2;
2237 if (batch->recv_initial_metadata) return 3;
2238 if (batch->recv_message) return 4;
2239 if (batch->recv_trailing_metadata) return 5;
2240 GPR_UNREACHABLE_CODE(return (size_t)-1);
2243 // This is called via the call combiner, so access to calld is synchronized.
2244 void CallData::PendingBatchesAdd(grpc_call_element* elem,
2245 grpc_transport_stream_op_batch* batch) {
2246 ChannelData* chand = static_cast<ChannelData*>(elem->channel_data);
2247 const size_t idx = GetBatchIndex(batch);
2248 if (GRPC_TRACE_FLAG_ENABLED(grpc_client_channel_call_trace)) {
2250 "chand=%p calld=%p: adding pending batch at index %" PRIuPTR, chand,
2253 PendingBatch* pending = &pending_batches_[idx];
2254 GPR_ASSERT(pending->batch == nullptr);
2255 pending->batch = batch;
2256 pending->send_ops_cached = false;
2257 if (enable_retries_) {
2258 // Update state in calld about pending batches.
2259 // Also check if the batch takes us over the retry buffer limit.
2260 // Note: We don't check the size of trailing metadata here, because
2261 // gRPC clients do not send trailing metadata.
2262 if (batch->send_initial_metadata) {
2263 pending_send_initial_metadata_ = true;
2264 bytes_buffered_for_retry_ += grpc_metadata_batch_size(
2265 batch->payload->send_initial_metadata.send_initial_metadata);
2267 if (batch->send_message) {
2268 pending_send_message_ = true;
2269 bytes_buffered_for_retry_ +=
2270 batch->payload->send_message.send_message->length();
2272 if (batch->send_trailing_metadata) {
2273 pending_send_trailing_metadata_ = true;
2275 if (GPR_UNLIKELY(bytes_buffered_for_retry_ >
2276 chand->per_rpc_retry_buffer_size())) {
2277 if (GRPC_TRACE_FLAG_ENABLED(grpc_client_channel_call_trace)) {
2279 "chand=%p calld=%p: exceeded retry buffer size, committing",
2282 SubchannelCallRetryState* retry_state =
2283 subchannel_call_ == nullptr ? nullptr
2284 : static_cast<SubchannelCallRetryState*>(
2285 subchannel_call_->GetParentData());
2286 RetryCommit(elem, retry_state);
2287 // If we are not going to retry and have not yet started, pretend
2288 // retries are disabled so that we don't bother with retry overhead.
2289 if (num_attempts_completed_ == 0) {
2290 if (GRPC_TRACE_FLAG_ENABLED(grpc_client_channel_call_trace)) {
2292 "chand=%p calld=%p: disabling retries before first attempt",
2295 enable_retries_ = false;
2301 void CallData::PendingBatchClear(PendingBatch* pending) {
2302 if (enable_retries_) {
2303 if (pending->batch->send_initial_metadata) {
2304 pending_send_initial_metadata_ = false;
2306 if (pending->batch->send_message) {
2307 pending_send_message_ = false;
2309 if (pending->batch->send_trailing_metadata) {
2310 pending_send_trailing_metadata_ = false;
2313 pending->batch = nullptr;
2316 void CallData::MaybeClearPendingBatch(grpc_call_element* elem,
2317 PendingBatch* pending) {
2318 ChannelData* chand = static_cast<ChannelData*>(elem->channel_data);
2319 grpc_transport_stream_op_batch* batch = pending->batch;
2320 // We clear the pending batch if all of its callbacks have been
2321 // scheduled and reset to nullptr.
2322 if (batch->on_complete == nullptr &&
2323 (!batch->recv_initial_metadata ||
2324 batch->payload->recv_initial_metadata.recv_initial_metadata_ready ==
2326 (!batch->recv_message ||
2327 batch->payload->recv_message.recv_message_ready == nullptr) &&
2328 (!batch->recv_trailing_metadata ||
2329 batch->payload->recv_trailing_metadata.recv_trailing_metadata_ready ==
2331 if (GRPC_TRACE_FLAG_ENABLED(grpc_client_channel_call_trace)) {
2332 gpr_log(GPR_INFO, "chand=%p calld=%p: clearing pending batch", chand,
2335 PendingBatchClear(pending);
2339 // This is called via the call combiner, so access to calld is synchronized.
2340 void CallData::FailPendingBatchInCallCombiner(void* arg, grpc_error* error) {
2341 grpc_transport_stream_op_batch* batch =
2342 static_cast<grpc_transport_stream_op_batch*>(arg);
2343 CallData* calld = static_cast<CallData*>(batch->handler_private.extra_arg);
2344 // Note: This will release the call combiner.
2345 grpc_transport_stream_op_batch_finish_with_failure(
2346 batch, GRPC_ERROR_REF(error), calld->call_combiner_);
2349 // This is called via the call combiner, so access to calld is synchronized.
2350 void CallData::PendingBatchesFail(
2351 grpc_call_element* elem, grpc_error* error,
2352 YieldCallCombinerPredicate yield_call_combiner_predicate) {
2353 GPR_ASSERT(error != GRPC_ERROR_NONE);
2354 if (GRPC_TRACE_FLAG_ENABLED(grpc_client_channel_call_trace)) {
2355 size_t num_batches = 0;
2356 for (size_t i = 0; i < GPR_ARRAY_SIZE(pending_batches_); ++i) {
2357 if (pending_batches_[i].batch != nullptr) ++num_batches;
2360 "chand=%p calld=%p: failing %" PRIuPTR " pending batches: %s",
2361 elem->channel_data, this, num_batches, grpc_error_string(error));
2363 CallCombinerClosureList closures;
2364 for (size_t i = 0; i < GPR_ARRAY_SIZE(pending_batches_); ++i) {
2365 PendingBatch* pending = &pending_batches_[i];
2366 grpc_transport_stream_op_batch* batch = pending->batch;
2367 if (batch != nullptr) {
2368 if (batch->recv_trailing_metadata) {
2369 MaybeInjectRecvTrailingMetadataReadyForLoadBalancingPolicy(batch);
2371 batch->handler_private.extra_arg = this;
2372 GRPC_CLOSURE_INIT(&batch->handler_private.closure,
2373 FailPendingBatchInCallCombiner, batch,
2374 grpc_schedule_on_exec_ctx);
2375 closures.Add(&batch->handler_private.closure, GRPC_ERROR_REF(error),
2376 "PendingBatchesFail");
2377 PendingBatchClear(pending);
2380 if (yield_call_combiner_predicate(closures)) {
2381 closures.RunClosures(call_combiner_);
2383 closures.RunClosuresWithoutYielding(call_combiner_);
2385 GRPC_ERROR_UNREF(error);
2388 // This is called via the call combiner, so access to calld is synchronized.
2389 void CallData::ResumePendingBatchInCallCombiner(void* arg,
2390 grpc_error* ignored) {
2391 grpc_transport_stream_op_batch* batch =
2392 static_cast<grpc_transport_stream_op_batch*>(arg);
2393 SubchannelCall* subchannel_call =
2394 static_cast<SubchannelCall*>(batch->handler_private.extra_arg);
2395 // Note: This will release the call combiner.
2396 subchannel_call->StartTransportStreamOpBatch(batch);
2399 // This is called via the call combiner, so access to calld is synchronized.
2400 void CallData::PendingBatchesResume(grpc_call_element* elem) {
2401 ChannelData* chand = static_cast<ChannelData*>(elem->channel_data);
2402 if (enable_retries_) {
2403 StartRetriableSubchannelBatches(elem, GRPC_ERROR_NONE);
2406 // Retries not enabled; send down batches as-is.
2407 if (GRPC_TRACE_FLAG_ENABLED(grpc_client_channel_call_trace)) {
2408 size_t num_batches = 0;
2409 for (size_t i = 0; i < GPR_ARRAY_SIZE(pending_batches_); ++i) {
2410 if (pending_batches_[i].batch != nullptr) ++num_batches;
2413 "chand=%p calld=%p: starting %" PRIuPTR
2414 " pending batches on subchannel_call=%p",
2415 chand, this, num_batches, subchannel_call_.get());
2417 CallCombinerClosureList closures;
2418 for (size_t i = 0; i < GPR_ARRAY_SIZE(pending_batches_); ++i) {
2419 PendingBatch* pending = &pending_batches_[i];
2420 grpc_transport_stream_op_batch* batch = pending->batch;
2421 if (batch != nullptr) {
2422 if (batch->recv_trailing_metadata) {
2423 MaybeInjectRecvTrailingMetadataReadyForLoadBalancingPolicy(batch);
2425 batch->handler_private.extra_arg = subchannel_call_.get();
2426 GRPC_CLOSURE_INIT(&batch->handler_private.closure,
2427 ResumePendingBatchInCallCombiner, batch,
2428 grpc_schedule_on_exec_ctx);
2429 closures.Add(&batch->handler_private.closure, GRPC_ERROR_NONE,
2430 "PendingBatchesResume");
2431 PendingBatchClear(pending);
2434 // Note: This will release the call combiner.
2435 closures.RunClosures(call_combiner_);
2438 template <typename Predicate>
2439 CallData::PendingBatch* CallData::PendingBatchFind(grpc_call_element* elem,
2440 const char* log_message,
2441 Predicate predicate) {
2442 ChannelData* chand = static_cast<ChannelData*>(elem->channel_data);
2443 for (size_t i = 0; i < GPR_ARRAY_SIZE(pending_batches_); ++i) {
2444 PendingBatch* pending = &pending_batches_[i];
2445 grpc_transport_stream_op_batch* batch = pending->batch;
2446 if (batch != nullptr && predicate(batch)) {
2447 if (GRPC_TRACE_FLAG_ENABLED(grpc_client_channel_call_trace)) {
2449 "chand=%p calld=%p: %s pending batch at index %" PRIuPTR, chand,
2450 this, log_message, i);
2462 void CallData::RetryCommit(grpc_call_element* elem,
2463 SubchannelCallRetryState* retry_state) {
2464 ChannelData* chand = static_cast<ChannelData*>(elem->channel_data);
2465 if (retry_committed_) return;
2466 retry_committed_ = true;
2467 if (GRPC_TRACE_FLAG_ENABLED(grpc_client_channel_call_trace)) {
2468 gpr_log(GPR_INFO, "chand=%p calld=%p: committing retries", chand, this);
2470 if (retry_state != nullptr) {
2471 FreeCachedSendOpDataAfterCommit(elem, retry_state);
2475 void CallData::DoRetry(grpc_call_element* elem,
2476 SubchannelCallRetryState* retry_state,
2477 grpc_millis server_pushback_ms) {
2478 ChannelData* chand = static_cast<ChannelData*>(elem->channel_data);
2479 GPR_ASSERT(method_params_ != nullptr);
2480 const auto* retry_policy = method_params_->retry_policy();
2481 GPR_ASSERT(retry_policy != nullptr);
2482 // Reset subchannel call.
2483 subchannel_call_.reset();
2484 // Compute backoff delay.
2485 grpc_millis next_attempt_time;
2486 if (server_pushback_ms >= 0) {
2487 next_attempt_time = ExecCtx::Get()->Now() + server_pushback_ms;
2488 last_attempt_got_server_pushback_ = true;
2490 if (num_attempts_completed_ == 1 || last_attempt_got_server_pushback_) {
2491 retry_backoff_.Init(
2493 .set_initial_backoff(retry_policy->initial_backoff)
2494 .set_multiplier(retry_policy->backoff_multiplier)
2495 .set_jitter(RETRY_BACKOFF_JITTER)
2496 .set_max_backoff(retry_policy->max_backoff));
2497 last_attempt_got_server_pushback_ = false;
2499 next_attempt_time = retry_backoff_->NextAttemptTime();
2501 if (GRPC_TRACE_FLAG_ENABLED(grpc_client_channel_call_trace)) {
2503 "chand=%p calld=%p: retrying failed call in %" PRId64 " ms", chand,
2504 this, next_attempt_time - ExecCtx::Get()->Now());
2506 // Schedule retry after computed delay.
2507 GRPC_CLOSURE_INIT(&pick_closure_, PickSubchannel, elem,
2508 grpc_schedule_on_exec_ctx);
2509 grpc_timer_init(&retry_timer_, next_attempt_time, &pick_closure_);
2510 // Update bookkeeping.
2511 if (retry_state != nullptr) retry_state->retry_dispatched = true;
2514 bool CallData::MaybeRetry(grpc_call_element* elem,
2515 SubchannelCallBatchData* batch_data,
2516 grpc_status_code status,
2517 grpc_mdelem* server_pushback_md) {
2518 ChannelData* chand = static_cast<ChannelData*>(elem->channel_data);
2519 // Get retry policy.
2520 if (method_params_ == nullptr) return false;
2521 const auto* retry_policy = method_params_->retry_policy();
2522 if (retry_policy == nullptr) return false;
2523 // If we've already dispatched a retry from this call, return true.
2524 // This catches the case where the batch has multiple callbacks
2525 // (i.e., it includes either recv_message or recv_initial_metadata).
2526 SubchannelCallRetryState* retry_state = nullptr;
2527 if (batch_data != nullptr) {
2528 retry_state = static_cast<SubchannelCallRetryState*>(
2529 batch_data->subchannel_call->GetParentData());
2530 if (retry_state->retry_dispatched) {
2531 if (GRPC_TRACE_FLAG_ENABLED(grpc_client_channel_call_trace)) {
2532 gpr_log(GPR_INFO, "chand=%p calld=%p: retry already dispatched", chand,
2539 if (GPR_LIKELY(status == GRPC_STATUS_OK)) {
2540 if (retry_throttle_data_ != nullptr) {
2541 retry_throttle_data_->RecordSuccess();
2543 if (GRPC_TRACE_FLAG_ENABLED(grpc_client_channel_call_trace)) {
2544 gpr_log(GPR_INFO, "chand=%p calld=%p: call succeeded", chand, this);
2548 // Status is not OK. Check whether the status is retryable.
2549 if (!retry_policy->retryable_status_codes.Contains(status)) {
2550 if (GRPC_TRACE_FLAG_ENABLED(grpc_client_channel_call_trace)) {
2552 "chand=%p calld=%p: status %s not configured as retryable", chand,
2553 this, grpc_status_code_to_string(status));
2557 // Record the failure and check whether retries are throttled.
2558 // Note that it's important for this check to come after the status
2559 // code check above, since we should only record failures whose statuses
2560 // match the configured retryable status codes, so that we don't count
2561 // things like failures due to malformed requests (INVALID_ARGUMENT).
2562 // Conversely, it's important for this to come before the remaining
2563 // checks, so that we don't fail to record failures due to other factors.
2564 if (retry_throttle_data_ != nullptr &&
2565 !retry_throttle_data_->RecordFailure()) {
2566 if (GRPC_TRACE_FLAG_ENABLED(grpc_client_channel_call_trace)) {
2567 gpr_log(GPR_INFO, "chand=%p calld=%p: retries throttled", chand, this);
2571 // Check whether the call is committed.
2572 if (retry_committed_) {
2573 if (GRPC_TRACE_FLAG_ENABLED(grpc_client_channel_call_trace)) {
2574 gpr_log(GPR_INFO, "chand=%p calld=%p: retries already committed", chand,
2579 // Check whether we have retries remaining.
2580 ++num_attempts_completed_;
2581 if (num_attempts_completed_ >= retry_policy->max_attempts) {
2582 if (GRPC_TRACE_FLAG_ENABLED(grpc_client_channel_call_trace)) {
2583 gpr_log(GPR_INFO, "chand=%p calld=%p: exceeded %d retry attempts", chand,
2584 this, retry_policy->max_attempts);
2588 // If the call was cancelled from the surface, don't retry.
2589 if (cancel_error_ != GRPC_ERROR_NONE) {
2590 if (GRPC_TRACE_FLAG_ENABLED(grpc_client_channel_call_trace)) {
2592 "chand=%p calld=%p: call cancelled from surface, not retrying",
2597 // Check server push-back.
2598 grpc_millis server_pushback_ms = -1;
2599 if (server_pushback_md != nullptr) {
2600 // If the value is "-1" or any other unparseable string, we do not retry.
2602 if (!grpc_parse_slice_to_uint32(GRPC_MDVALUE(*server_pushback_md), &ms)) {
2603 if (GRPC_TRACE_FLAG_ENABLED(grpc_client_channel_call_trace)) {
2605 "chand=%p calld=%p: not retrying due to server push-back",
2610 if (GRPC_TRACE_FLAG_ENABLED(grpc_client_channel_call_trace)) {
2611 gpr_log(GPR_INFO, "chand=%p calld=%p: server push-back: retry in %u ms",
2614 server_pushback_ms = (grpc_millis)ms;
2617 DoRetry(elem, retry_state, server_pushback_ms);
2622 // CallData::SubchannelCallBatchData
2625 CallData::SubchannelCallBatchData* CallData::SubchannelCallBatchData::Create(
2626 grpc_call_element* elem, int refcount, bool set_on_complete) {
2627 CallData* calld = static_cast<CallData*>(elem->call_data);
2628 return calld->arena_->New<SubchannelCallBatchData>(elem, calld, refcount,
2632 CallData::SubchannelCallBatchData::SubchannelCallBatchData(
2633 grpc_call_element* elem, CallData* calld, int refcount,
2634 bool set_on_complete)
2635 : elem(elem), subchannel_call(calld->subchannel_call_) {
2636 SubchannelCallRetryState* retry_state =
2637 static_cast<SubchannelCallRetryState*>(
2638 calld->subchannel_call_->GetParentData());
2639 batch.payload = &retry_state->batch_payload;
2640 gpr_ref_init(&refs, refcount);
2641 if (set_on_complete) {
2642 GRPC_CLOSURE_INIT(&on_complete, CallData::OnComplete, this,
2643 grpc_schedule_on_exec_ctx);
2644 batch.on_complete = &on_complete;
2646 GRPC_CALL_STACK_REF(calld->owning_call_, "batch_data");
2649 void CallData::SubchannelCallBatchData::Destroy() {
2650 SubchannelCallRetryState* retry_state =
2651 static_cast<SubchannelCallRetryState*>(subchannel_call->GetParentData());
2652 if (batch.send_initial_metadata) {
2653 grpc_metadata_batch_destroy(&retry_state->send_initial_metadata);
2655 if (batch.send_trailing_metadata) {
2656 grpc_metadata_batch_destroy(&retry_state->send_trailing_metadata);
2658 if (batch.recv_initial_metadata) {
2659 grpc_metadata_batch_destroy(&retry_state->recv_initial_metadata);
2661 if (batch.recv_trailing_metadata) {
2662 grpc_metadata_batch_destroy(&retry_state->recv_trailing_metadata);
2664 subchannel_call.reset();
2665 CallData* calld = static_cast<CallData*>(elem->call_data);
2666 GRPC_CALL_STACK_UNREF(calld->owning_call_, "batch_data");
2670 // recv_initial_metadata callback handling
2673 void CallData::InvokeRecvInitialMetadataCallback(void* arg, grpc_error* error) {
2674 SubchannelCallBatchData* batch_data =
2675 static_cast<SubchannelCallBatchData*>(arg);
2676 CallData* calld = static_cast<CallData*>(batch_data->elem->call_data);
2677 // Find pending batch.
2678 PendingBatch* pending = calld->PendingBatchFind(
2679 batch_data->elem, "invoking recv_initial_metadata_ready for",
2680 [](grpc_transport_stream_op_batch* batch) {
2681 return batch->recv_initial_metadata &&
2682 batch->payload->recv_initial_metadata
2683 .recv_initial_metadata_ready != nullptr;
2685 GPR_ASSERT(pending != nullptr);
2687 SubchannelCallRetryState* retry_state =
2688 static_cast<SubchannelCallRetryState*>(
2689 batch_data->subchannel_call->GetParentData());
2690 grpc_metadata_batch_move(
2691 &retry_state->recv_initial_metadata,
2692 pending->batch->payload->recv_initial_metadata.recv_initial_metadata);
2693 // Update bookkeeping.
2694 // Note: Need to do this before invoking the callback, since invoking
2695 // the callback will result in yielding the call combiner.
2696 grpc_closure* recv_initial_metadata_ready =
2697 pending->batch->payload->recv_initial_metadata
2698 .recv_initial_metadata_ready;
2699 pending->batch->payload->recv_initial_metadata.recv_initial_metadata_ready =
2701 calld->MaybeClearPendingBatch(batch_data->elem, pending);
2702 batch_data->Unref();
2704 GRPC_CLOSURE_RUN(recv_initial_metadata_ready, GRPC_ERROR_REF(error));
2707 void CallData::RecvInitialMetadataReady(void* arg, grpc_error* error) {
2708 SubchannelCallBatchData* batch_data =
2709 static_cast<SubchannelCallBatchData*>(arg);
2710 grpc_call_element* elem = batch_data->elem;
2711 ChannelData* chand = static_cast<ChannelData*>(elem->channel_data);
2712 CallData* calld = static_cast<CallData*>(elem->call_data);
2713 if (GRPC_TRACE_FLAG_ENABLED(grpc_client_channel_call_trace)) {
2715 "chand=%p calld=%p: got recv_initial_metadata_ready, error=%s",
2716 chand, calld, grpc_error_string(error));
2718 SubchannelCallRetryState* retry_state =
2719 static_cast<SubchannelCallRetryState*>(
2720 batch_data->subchannel_call->GetParentData());
2721 retry_state->completed_recv_initial_metadata = true;
2722 // If a retry was already dispatched, then we're not going to use the
2723 // result of this recv_initial_metadata op, so do nothing.
2724 if (retry_state->retry_dispatched) {
2725 GRPC_CALL_COMBINER_STOP(
2726 calld->call_combiner_,
2727 "recv_initial_metadata_ready after retry dispatched");
2730 // If we got an error or a Trailers-Only response and have not yet gotten
2731 // the recv_trailing_metadata_ready callback, then defer propagating this
2732 // callback back to the surface. We can evaluate whether to retry when
2733 // recv_trailing_metadata comes back.
2734 if (GPR_UNLIKELY((retry_state->trailing_metadata_available ||
2735 error != GRPC_ERROR_NONE) &&
2736 !retry_state->completed_recv_trailing_metadata)) {
2737 if (GRPC_TRACE_FLAG_ENABLED(grpc_client_channel_call_trace)) {
2739 "chand=%p calld=%p: deferring recv_initial_metadata_ready "
2743 retry_state->recv_initial_metadata_ready_deferred_batch = batch_data;
2744 retry_state->recv_initial_metadata_error = GRPC_ERROR_REF(error);
2745 if (!retry_state->started_recv_trailing_metadata) {
2746 // recv_trailing_metadata not yet started by application; start it
2747 // ourselves to get status.
2748 calld->StartInternalRecvTrailingMetadata(elem);
2750 GRPC_CALL_COMBINER_STOP(
2751 calld->call_combiner_,
2752 "recv_initial_metadata_ready trailers-only or error");
2756 // Received valid initial metadata, so commit the call.
2757 calld->RetryCommit(elem, retry_state);
2758 // Invoke the callback to return the result to the surface.
2759 // Manually invoking a callback function; it does not take ownership of error.
2760 calld->InvokeRecvInitialMetadataCallback(batch_data, error);
2764 // recv_message callback handling
2767 void CallData::InvokeRecvMessageCallback(void* arg, grpc_error* error) {
2768 SubchannelCallBatchData* batch_data =
2769 static_cast<SubchannelCallBatchData*>(arg);
2770 CallData* calld = static_cast<CallData*>(batch_data->elem->call_data);
2772 PendingBatch* pending = calld->PendingBatchFind(
2773 batch_data->elem, "invoking recv_message_ready for",
2774 [](grpc_transport_stream_op_batch* batch) {
2775 return batch->recv_message &&
2776 batch->payload->recv_message.recv_message_ready != nullptr;
2778 GPR_ASSERT(pending != nullptr);
2780 SubchannelCallRetryState* retry_state =
2781 static_cast<SubchannelCallRetryState*>(
2782 batch_data->subchannel_call->GetParentData());
2783 *pending->batch->payload->recv_message.recv_message =
2784 std::move(retry_state->recv_message);
2785 // Update bookkeeping.
2786 // Note: Need to do this before invoking the callback, since invoking
2787 // the callback will result in yielding the call combiner.
2788 grpc_closure* recv_message_ready =
2789 pending->batch->payload->recv_message.recv_message_ready;
2790 pending->batch->payload->recv_message.recv_message_ready = nullptr;
2791 calld->MaybeClearPendingBatch(batch_data->elem, pending);
2792 batch_data->Unref();
2794 GRPC_CLOSURE_RUN(recv_message_ready, GRPC_ERROR_REF(error));
2797 void CallData::RecvMessageReady(void* arg, grpc_error* error) {
2798 SubchannelCallBatchData* batch_data =
2799 static_cast<SubchannelCallBatchData*>(arg);
2800 grpc_call_element* elem = batch_data->elem;
2801 ChannelData* chand = static_cast<ChannelData*>(elem->channel_data);
2802 CallData* calld = static_cast<CallData*>(elem->call_data);
2803 if (GRPC_TRACE_FLAG_ENABLED(grpc_client_channel_call_trace)) {
2804 gpr_log(GPR_INFO, "chand=%p calld=%p: got recv_message_ready, error=%s",
2805 chand, calld, grpc_error_string(error));
2807 SubchannelCallRetryState* retry_state =
2808 static_cast<SubchannelCallRetryState*>(
2809 batch_data->subchannel_call->GetParentData());
2810 ++retry_state->completed_recv_message_count;
2811 // If a retry was already dispatched, then we're not going to use the
2812 // result of this recv_message op, so do nothing.
2813 if (retry_state->retry_dispatched) {
2814 GRPC_CALL_COMBINER_STOP(calld->call_combiner_,
2815 "recv_message_ready after retry dispatched");
2818 // If we got an error or the payload was nullptr and we have not yet gotten
2819 // the recv_trailing_metadata_ready callback, then defer propagating this
2820 // callback back to the surface. We can evaluate whether to retry when
2821 // recv_trailing_metadata comes back.
2823 (retry_state->recv_message == nullptr || error != GRPC_ERROR_NONE) &&
2824 !retry_state->completed_recv_trailing_metadata)) {
2825 if (GRPC_TRACE_FLAG_ENABLED(grpc_client_channel_call_trace)) {
2827 "chand=%p calld=%p: deferring recv_message_ready (nullptr "
2828 "message and recv_trailing_metadata pending)",
2831 retry_state->recv_message_ready_deferred_batch = batch_data;
2832 retry_state->recv_message_error = GRPC_ERROR_REF(error);
2833 if (!retry_state->started_recv_trailing_metadata) {
2834 // recv_trailing_metadata not yet started by application; start it
2835 // ourselves to get status.
2836 calld->StartInternalRecvTrailingMetadata(elem);
2838 GRPC_CALL_COMBINER_STOP(calld->call_combiner_, "recv_message_ready null");
2842 // Received a valid message, so commit the call.
2843 calld->RetryCommit(elem, retry_state);
2844 // Invoke the callback to return the result to the surface.
2845 // Manually invoking a callback function; it does not take ownership of error.
2846 calld->InvokeRecvMessageCallback(batch_data, error);
2850 // recv_trailing_metadata handling
2853 void CallData::GetCallStatus(grpc_call_element* elem,
2854 grpc_metadata_batch* md_batch, grpc_error* error,
2855 grpc_status_code* status,
2856 grpc_mdelem** server_pushback_md) {
2857 if (error != GRPC_ERROR_NONE) {
2858 grpc_error_get_status(error, deadline_, status, nullptr, nullptr, nullptr);
2860 GPR_ASSERT(md_batch->idx.named.grpc_status != nullptr);
2862 grpc_get_status_code_from_metadata(md_batch->idx.named.grpc_status->md);
2863 if (server_pushback_md != nullptr &&
2864 md_batch->idx.named.grpc_retry_pushback_ms != nullptr) {
2865 *server_pushback_md = &md_batch->idx.named.grpc_retry_pushback_ms->md;
2868 GRPC_ERROR_UNREF(error);
2871 void CallData::AddClosureForRecvTrailingMetadataReady(
2872 grpc_call_element* elem, SubchannelCallBatchData* batch_data,
2873 grpc_error* error, CallCombinerClosureList* closures) {
2874 // Find pending batch.
2875 PendingBatch* pending = PendingBatchFind(
2876 elem, "invoking recv_trailing_metadata for",
2877 [](grpc_transport_stream_op_batch* batch) {
2878 return batch->recv_trailing_metadata &&
2879 batch->payload->recv_trailing_metadata
2880 .recv_trailing_metadata_ready != nullptr;
2882 // If we generated the recv_trailing_metadata op internally via
2883 // StartInternalRecvTrailingMetadata(), then there will be no pending batch.
2884 if (pending == nullptr) {
2885 GRPC_ERROR_UNREF(error);
2889 SubchannelCallRetryState* retry_state =
2890 static_cast<SubchannelCallRetryState*>(
2891 batch_data->subchannel_call->GetParentData());
2892 grpc_metadata_batch_move(
2893 &retry_state->recv_trailing_metadata,
2894 pending->batch->payload->recv_trailing_metadata.recv_trailing_metadata);
2896 closures->Add(pending->batch->payload->recv_trailing_metadata
2897 .recv_trailing_metadata_ready,
2898 error, "recv_trailing_metadata_ready for pending batch");
2899 // Update bookkeeping.
2900 pending->batch->payload->recv_trailing_metadata.recv_trailing_metadata_ready =
2902 MaybeClearPendingBatch(elem, pending);
2905 void CallData::AddClosuresForDeferredRecvCallbacks(
2906 SubchannelCallBatchData* batch_data, SubchannelCallRetryState* retry_state,
2907 CallCombinerClosureList* closures) {
2908 if (batch_data->batch.recv_trailing_metadata) {
2909 // Add closure for deferred recv_initial_metadata_ready.
2910 if (GPR_UNLIKELY(retry_state->recv_initial_metadata_ready_deferred_batch !=
2912 GRPC_CLOSURE_INIT(&retry_state->recv_initial_metadata_ready,
2913 InvokeRecvInitialMetadataCallback,
2914 retry_state->recv_initial_metadata_ready_deferred_batch,
2915 grpc_schedule_on_exec_ctx);
2916 closures->Add(&retry_state->recv_initial_metadata_ready,
2917 retry_state->recv_initial_metadata_error,
2918 "resuming recv_initial_metadata_ready");
2919 retry_state->recv_initial_metadata_ready_deferred_batch = nullptr;
2921 // Add closure for deferred recv_message_ready.
2922 if (GPR_UNLIKELY(retry_state->recv_message_ready_deferred_batch !=
2924 GRPC_CLOSURE_INIT(&retry_state->recv_message_ready,
2925 InvokeRecvMessageCallback,
2926 retry_state->recv_message_ready_deferred_batch,
2927 grpc_schedule_on_exec_ctx);
2928 closures->Add(&retry_state->recv_message_ready,
2929 retry_state->recv_message_error,
2930 "resuming recv_message_ready");
2931 retry_state->recv_message_ready_deferred_batch = nullptr;
2936 bool CallData::PendingBatchIsUnstarted(PendingBatch* pending,
2937 SubchannelCallRetryState* retry_state) {
2938 if (pending->batch == nullptr || pending->batch->on_complete == nullptr) {
2941 if (pending->batch->send_initial_metadata &&
2942 !retry_state->started_send_initial_metadata) {
2945 if (pending->batch->send_message &&
2946 retry_state->started_send_message_count < send_messages_.size()) {
2949 if (pending->batch->send_trailing_metadata &&
2950 !retry_state->started_send_trailing_metadata) {
2956 void CallData::AddClosuresToFailUnstartedPendingBatches(
2957 grpc_call_element* elem, SubchannelCallRetryState* retry_state,
2958 grpc_error* error, CallCombinerClosureList* closures) {
2959 ChannelData* chand = static_cast<ChannelData*>(elem->channel_data);
2960 for (size_t i = 0; i < GPR_ARRAY_SIZE(pending_batches_); ++i) {
2961 PendingBatch* pending = &pending_batches_[i];
2962 if (PendingBatchIsUnstarted(pending, retry_state)) {
2963 if (GRPC_TRACE_FLAG_ENABLED(grpc_client_channel_call_trace)) {
2965 "chand=%p calld=%p: failing unstarted pending batch at index "
2969 closures->Add(pending->batch->on_complete, GRPC_ERROR_REF(error),
2970 "failing on_complete for pending batch");
2971 pending->batch->on_complete = nullptr;
2972 MaybeClearPendingBatch(elem, pending);
2975 GRPC_ERROR_UNREF(error);
2978 void CallData::RunClosuresForCompletedCall(SubchannelCallBatchData* batch_data,
2979 grpc_error* error) {
2980 grpc_call_element* elem = batch_data->elem;
2981 SubchannelCallRetryState* retry_state =
2982 static_cast<SubchannelCallRetryState*>(
2983 batch_data->subchannel_call->GetParentData());
2984 // Construct list of closures to execute.
2985 CallCombinerClosureList closures;
2986 // First, add closure for recv_trailing_metadata_ready.
2987 AddClosureForRecvTrailingMetadataReady(elem, batch_data,
2988 GRPC_ERROR_REF(error), &closures);
2989 // If there are deferred recv_initial_metadata_ready or recv_message_ready
2990 // callbacks, add them to closures.
2991 AddClosuresForDeferredRecvCallbacks(batch_data, retry_state, &closures);
2992 // Add closures to fail any pending batches that have not yet been started.
2993 AddClosuresToFailUnstartedPendingBatches(elem, retry_state,
2994 GRPC_ERROR_REF(error), &closures);
2995 // Don't need batch_data anymore.
2996 batch_data->Unref();
2997 // Schedule all of the closures identified above.
2998 // Note: This will release the call combiner.
2999 closures.RunClosures(call_combiner_);
3000 GRPC_ERROR_UNREF(error);
3003 void CallData::RecvTrailingMetadataReady(void* arg, grpc_error* error) {
3004 SubchannelCallBatchData* batch_data =
3005 static_cast<SubchannelCallBatchData*>(arg);
3006 grpc_call_element* elem = batch_data->elem;
3007 ChannelData* chand = static_cast<ChannelData*>(elem->channel_data);
3008 CallData* calld = static_cast<CallData*>(elem->call_data);
3009 if (GRPC_TRACE_FLAG_ENABLED(grpc_client_channel_call_trace)) {
3011 "chand=%p calld=%p: got recv_trailing_metadata_ready, error=%s",
3012 chand, calld, grpc_error_string(error));
3014 SubchannelCallRetryState* retry_state =
3015 static_cast<SubchannelCallRetryState*>(
3016 batch_data->subchannel_call->GetParentData());
3017 retry_state->completed_recv_trailing_metadata = true;
3018 // Get the call's status and check for server pushback metadata.
3019 grpc_status_code status = GRPC_STATUS_OK;
3020 grpc_mdelem* server_pushback_md = nullptr;
3021 grpc_metadata_batch* md_batch =
3022 batch_data->batch.payload->recv_trailing_metadata.recv_trailing_metadata;
3023 calld->GetCallStatus(elem, md_batch, GRPC_ERROR_REF(error), &status,
3024 &server_pushback_md);
3025 if (GRPC_TRACE_FLAG_ENABLED(grpc_client_channel_call_trace)) {
3026 gpr_log(GPR_INFO, "chand=%p calld=%p: call finished, status=%s", chand,
3027 calld, grpc_status_code_to_string(status));
3029 // Check if we should retry.
3030 if (calld->MaybeRetry(elem, batch_data, status, server_pushback_md)) {
3031 // Unref batch_data for deferred recv_initial_metadata_ready or
3032 // recv_message_ready callbacks, if any.
3033 if (retry_state->recv_initial_metadata_ready_deferred_batch != nullptr) {
3034 batch_data->Unref();
3035 GRPC_ERROR_UNREF(retry_state->recv_initial_metadata_error);
3037 if (retry_state->recv_message_ready_deferred_batch != nullptr) {
3038 batch_data->Unref();
3039 GRPC_ERROR_UNREF(retry_state->recv_message_error);
3041 batch_data->Unref();
3044 // Not retrying, so commit the call.
3045 calld->RetryCommit(elem, retry_state);
3046 // Run any necessary closures.
3047 calld->RunClosuresForCompletedCall(batch_data, GRPC_ERROR_REF(error));
3051 // on_complete callback handling
3054 void CallData::AddClosuresForCompletedPendingBatch(
3055 grpc_call_element* elem, SubchannelCallBatchData* batch_data,
3056 SubchannelCallRetryState* retry_state, grpc_error* error,
3057 CallCombinerClosureList* closures) {
3058 PendingBatch* pending = PendingBatchFind(
3059 elem, "completed", [batch_data](grpc_transport_stream_op_batch* batch) {
3060 // Match the pending batch with the same set of send ops as the
3061 // subchannel batch we've just completed.
3062 return batch->on_complete != nullptr &&
3063 batch_data->batch.send_initial_metadata ==
3064 batch->send_initial_metadata &&
3065 batch_data->batch.send_message == batch->send_message &&
3066 batch_data->batch.send_trailing_metadata ==
3067 batch->send_trailing_metadata;
3069 // If batch_data is a replay batch, then there will be no pending
3070 // batch to complete.
3071 if (pending == nullptr) {
3072 GRPC_ERROR_UNREF(error);
3076 closures->Add(pending->batch->on_complete, error,
3077 "on_complete for pending batch");
3078 pending->batch->on_complete = nullptr;
3079 MaybeClearPendingBatch(elem, pending);
3082 void CallData::AddClosuresForReplayOrPendingSendOps(
3083 grpc_call_element* elem, SubchannelCallBatchData* batch_data,
3084 SubchannelCallRetryState* retry_state, CallCombinerClosureList* closures) {
3085 ChannelData* chand = static_cast<ChannelData*>(elem->channel_data);
3086 bool have_pending_send_message_ops =
3087 retry_state->started_send_message_count < send_messages_.size();
3088 bool have_pending_send_trailing_metadata_op =
3089 seen_send_trailing_metadata_ &&
3090 !retry_state->started_send_trailing_metadata;
3091 if (!have_pending_send_message_ops &&
3092 !have_pending_send_trailing_metadata_op) {
3093 for (size_t i = 0; i < GPR_ARRAY_SIZE(pending_batches_); ++i) {
3094 PendingBatch* pending = &pending_batches_[i];
3095 grpc_transport_stream_op_batch* batch = pending->batch;
3096 if (batch == nullptr || pending->send_ops_cached) continue;
3097 if (batch->send_message) have_pending_send_message_ops = true;
3098 if (batch->send_trailing_metadata) {
3099 have_pending_send_trailing_metadata_op = true;
3103 if (have_pending_send_message_ops || have_pending_send_trailing_metadata_op) {
3104 if (GRPC_TRACE_FLAG_ENABLED(grpc_client_channel_call_trace)) {
3106 "chand=%p calld=%p: starting next batch for pending send op(s)",
3109 GRPC_CLOSURE_INIT(&batch_data->batch.handler_private.closure,
3110 StartRetriableSubchannelBatches, elem,
3111 grpc_schedule_on_exec_ctx);
3112 closures->Add(&batch_data->batch.handler_private.closure, GRPC_ERROR_NONE,
3113 "starting next batch for send_* op(s)");
3117 void CallData::OnComplete(void* arg, grpc_error* error) {
3118 SubchannelCallBatchData* batch_data =
3119 static_cast<SubchannelCallBatchData*>(arg);
3120 grpc_call_element* elem = batch_data->elem;
3121 ChannelData* chand = static_cast<ChannelData*>(elem->channel_data);
3122 CallData* calld = static_cast<CallData*>(elem->call_data);
3123 if (GRPC_TRACE_FLAG_ENABLED(grpc_client_channel_call_trace)) {
3124 char* batch_str = grpc_transport_stream_op_batch_string(&batch_data->batch);
3125 gpr_log(GPR_INFO, "chand=%p calld=%p: got on_complete, error=%s, batch=%s",
3126 chand, calld, grpc_error_string(error), batch_str);
3127 gpr_free(batch_str);
3129 SubchannelCallRetryState* retry_state =
3130 static_cast<SubchannelCallRetryState*>(
3131 batch_data->subchannel_call->GetParentData());
3132 // Update bookkeeping in retry_state.
3133 if (batch_data->batch.send_initial_metadata) {
3134 retry_state->completed_send_initial_metadata = true;
3136 if (batch_data->batch.send_message) {
3137 ++retry_state->completed_send_message_count;
3139 if (batch_data->batch.send_trailing_metadata) {
3140 retry_state->completed_send_trailing_metadata = true;
3142 // If the call is committed, free cached data for send ops that we've just
3144 if (calld->retry_committed_) {
3145 calld->FreeCachedSendOpDataForCompletedBatch(elem, batch_data, retry_state);
3147 // Construct list of closures to execute.
3148 CallCombinerClosureList closures;
3149 // If a retry was already dispatched, that means we saw
3150 // recv_trailing_metadata before this, so we do nothing here.
3151 // Otherwise, invoke the callback to return the result to the surface.
3152 if (!retry_state->retry_dispatched) {
3153 // Add closure for the completed pending batch, if any.
3154 calld->AddClosuresForCompletedPendingBatch(
3155 elem, batch_data, retry_state, GRPC_ERROR_REF(error), &closures);
3156 // If needed, add a callback to start any replay or pending send ops on
3157 // the subchannel call.
3158 if (!retry_state->completed_recv_trailing_metadata) {
3159 calld->AddClosuresForReplayOrPendingSendOps(elem, batch_data, retry_state,
3163 // Track number of pending subchannel send batches and determine if this
3164 // was the last one.
3165 --calld->num_pending_retriable_subchannel_send_batches_;
3166 const bool last_send_batch_complete =
3167 calld->num_pending_retriable_subchannel_send_batches_ == 0;
3168 // Don't need batch_data anymore.
3169 batch_data->Unref();
3170 // Schedule all of the closures identified above.
3171 // Note: This yeilds the call combiner.
3172 closures.RunClosures(calld->call_combiner_);
3173 // If this was the last subchannel send batch, unref the call stack.
3174 if (last_send_batch_complete) {
3175 GRPC_CALL_STACK_UNREF(calld->owning_call_, "subchannel_send_batches");
3180 // subchannel batch construction
3183 void CallData::StartBatchInCallCombiner(void* arg, grpc_error* ignored) {
3184 grpc_transport_stream_op_batch* batch =
3185 static_cast<grpc_transport_stream_op_batch*>(arg);
3186 SubchannelCall* subchannel_call =
3187 static_cast<SubchannelCall*>(batch->handler_private.extra_arg);
3188 // Note: This will release the call combiner.
3189 subchannel_call->StartTransportStreamOpBatch(batch);
3192 void CallData::AddClosureForSubchannelBatch(
3193 grpc_call_element* elem, grpc_transport_stream_op_batch* batch,
3194 CallCombinerClosureList* closures) {
3195 ChannelData* chand = static_cast<ChannelData*>(elem->channel_data);
3196 batch->handler_private.extra_arg = subchannel_call_.get();
3197 GRPC_CLOSURE_INIT(&batch->handler_private.closure, StartBatchInCallCombiner,
3198 batch, grpc_schedule_on_exec_ctx);
3199 if (GRPC_TRACE_FLAG_ENABLED(grpc_client_channel_call_trace)) {
3200 char* batch_str = grpc_transport_stream_op_batch_string(batch);
3201 gpr_log(GPR_INFO, "chand=%p calld=%p: starting subchannel batch: %s", chand,
3203 gpr_free(batch_str);
3205 closures->Add(&batch->handler_private.closure, GRPC_ERROR_NONE,
3206 "start_subchannel_batch");
3209 void CallData::AddRetriableSendInitialMetadataOp(
3210 SubchannelCallRetryState* retry_state,
3211 SubchannelCallBatchData* batch_data) {
3212 // Maps the number of retries to the corresponding metadata value slice.
3213 const grpc_slice* retry_count_strings[] = {&GRPC_MDSTR_1, &GRPC_MDSTR_2,
3214 &GRPC_MDSTR_3, &GRPC_MDSTR_4};
3215 // We need to make a copy of the metadata batch for each attempt, since
3216 // the filters in the subchannel stack may modify this batch, and we don't
3217 // want those modifications to be passed forward to subsequent attempts.
3219 // If we've already completed one or more attempts, add the
3220 // grpc-retry-attempts header.
3221 retry_state->send_initial_metadata_storage =
3222 static_cast<grpc_linked_mdelem*>(arena_->Alloc(
3223 sizeof(grpc_linked_mdelem) *
3224 (send_initial_metadata_.list.count + (num_attempts_completed_ > 0))));
3225 grpc_metadata_batch_copy(&send_initial_metadata_,
3226 &retry_state->send_initial_metadata,
3227 retry_state->send_initial_metadata_storage);
3228 if (GPR_UNLIKELY(retry_state->send_initial_metadata.idx.named
3229 .grpc_previous_rpc_attempts != nullptr)) {
3230 grpc_metadata_batch_remove(&retry_state->send_initial_metadata,
3231 GRPC_BATCH_GRPC_PREVIOUS_RPC_ATTEMPTS);
3233 if (GPR_UNLIKELY(num_attempts_completed_ > 0)) {
3234 grpc_mdelem retry_md = grpc_mdelem_create(
3235 GRPC_MDSTR_GRPC_PREVIOUS_RPC_ATTEMPTS,
3236 *retry_count_strings[num_attempts_completed_ - 1], nullptr);
3237 grpc_error* error = grpc_metadata_batch_add_tail(
3238 &retry_state->send_initial_metadata,
3240 ->send_initial_metadata_storage[send_initial_metadata_.list.count],
3241 retry_md, GRPC_BATCH_GRPC_PREVIOUS_RPC_ATTEMPTS);
3242 if (GPR_UNLIKELY(error != GRPC_ERROR_NONE)) {
3243 gpr_log(GPR_ERROR, "error adding retry metadata: %s",
3244 grpc_error_string(error));
3248 retry_state->started_send_initial_metadata = true;
3249 batch_data->batch.send_initial_metadata = true;
3250 batch_data->batch.payload->send_initial_metadata.send_initial_metadata =
3251 &retry_state->send_initial_metadata;
3252 batch_data->batch.payload->send_initial_metadata.send_initial_metadata_flags =
3253 send_initial_metadata_flags_;
3254 batch_data->batch.payload->send_initial_metadata.peer_string = peer_string_;
3257 void CallData::AddRetriableSendMessageOp(grpc_call_element* elem,
3258 SubchannelCallRetryState* retry_state,
3259 SubchannelCallBatchData* batch_data) {
3260 ChannelData* chand = static_cast<ChannelData*>(elem->channel_data);
3261 if (GRPC_TRACE_FLAG_ENABLED(grpc_client_channel_call_trace)) {
3263 "chand=%p calld=%p: starting calld->send_messages[%" PRIuPTR "]",
3264 chand, this, retry_state->started_send_message_count);
3266 ByteStreamCache* cache =
3267 send_messages_[retry_state->started_send_message_count];
3268 ++retry_state->started_send_message_count;
3269 retry_state->send_message.Init(cache);
3270 batch_data->batch.send_message = true;
3271 batch_data->batch.payload->send_message.send_message.reset(
3272 retry_state->send_message.get());
3275 void CallData::AddRetriableSendTrailingMetadataOp(
3276 SubchannelCallRetryState* retry_state,
3277 SubchannelCallBatchData* batch_data) {
3278 // We need to make a copy of the metadata batch for each attempt, since
3279 // the filters in the subchannel stack may modify this batch, and we don't
3280 // want those modifications to be passed forward to subsequent attempts.
3281 retry_state->send_trailing_metadata_storage =
3282 static_cast<grpc_linked_mdelem*>(arena_->Alloc(
3283 sizeof(grpc_linked_mdelem) * send_trailing_metadata_.list.count));
3284 grpc_metadata_batch_copy(&send_trailing_metadata_,
3285 &retry_state->send_trailing_metadata,
3286 retry_state->send_trailing_metadata_storage);
3287 retry_state->started_send_trailing_metadata = true;
3288 batch_data->batch.send_trailing_metadata = true;
3289 batch_data->batch.payload->send_trailing_metadata.send_trailing_metadata =
3290 &retry_state->send_trailing_metadata;
3293 void CallData::AddRetriableRecvInitialMetadataOp(
3294 SubchannelCallRetryState* retry_state,
3295 SubchannelCallBatchData* batch_data) {
3296 retry_state->started_recv_initial_metadata = true;
3297 batch_data->batch.recv_initial_metadata = true;
3298 grpc_metadata_batch_init(&retry_state->recv_initial_metadata);
3299 batch_data->batch.payload->recv_initial_metadata.recv_initial_metadata =
3300 &retry_state->recv_initial_metadata;
3301 batch_data->batch.payload->recv_initial_metadata.trailing_metadata_available =
3302 &retry_state->trailing_metadata_available;
3303 GRPC_CLOSURE_INIT(&retry_state->recv_initial_metadata_ready,
3304 RecvInitialMetadataReady, batch_data,
3305 grpc_schedule_on_exec_ctx);
3306 batch_data->batch.payload->recv_initial_metadata.recv_initial_metadata_ready =
3307 &retry_state->recv_initial_metadata_ready;
3310 void CallData::AddRetriableRecvMessageOp(SubchannelCallRetryState* retry_state,
3311 SubchannelCallBatchData* batch_data) {
3312 ++retry_state->started_recv_message_count;
3313 batch_data->batch.recv_message = true;
3314 batch_data->batch.payload->recv_message.recv_message =
3315 &retry_state->recv_message;
3316 GRPC_CLOSURE_INIT(&retry_state->recv_message_ready, RecvMessageReady,
3317 batch_data, grpc_schedule_on_exec_ctx);
3318 batch_data->batch.payload->recv_message.recv_message_ready =
3319 &retry_state->recv_message_ready;
3322 void CallData::AddRetriableRecvTrailingMetadataOp(
3323 SubchannelCallRetryState* retry_state,
3324 SubchannelCallBatchData* batch_data) {
3325 retry_state->started_recv_trailing_metadata = true;
3326 batch_data->batch.recv_trailing_metadata = true;
3327 grpc_metadata_batch_init(&retry_state->recv_trailing_metadata);
3328 batch_data->batch.payload->recv_trailing_metadata.recv_trailing_metadata =
3329 &retry_state->recv_trailing_metadata;
3330 batch_data->batch.payload->recv_trailing_metadata.collect_stats =
3331 &retry_state->collect_stats;
3332 GRPC_CLOSURE_INIT(&retry_state->recv_trailing_metadata_ready,
3333 RecvTrailingMetadataReady, batch_data,
3334 grpc_schedule_on_exec_ctx);
3335 batch_data->batch.payload->recv_trailing_metadata
3336 .recv_trailing_metadata_ready =
3337 &retry_state->recv_trailing_metadata_ready;
3338 MaybeInjectRecvTrailingMetadataReadyForLoadBalancingPolicy(
3339 &batch_data->batch);
3342 void CallData::StartInternalRecvTrailingMetadata(grpc_call_element* elem) {
3343 ChannelData* chand = static_cast<ChannelData*>(elem->channel_data);
3344 if (GRPC_TRACE_FLAG_ENABLED(grpc_client_channel_call_trace)) {
3346 "chand=%p calld=%p: call failed but recv_trailing_metadata not "
3347 "started; starting it internally",
3350 SubchannelCallRetryState* retry_state =
3351 static_cast<SubchannelCallRetryState*>(subchannel_call_->GetParentData());
3352 // Create batch_data with 2 refs, since this batch will be unreffed twice:
3353 // once for the recv_trailing_metadata_ready callback when the subchannel
3354 // batch returns, and again when we actually get a recv_trailing_metadata
3355 // op from the surface.
3356 SubchannelCallBatchData* batch_data =
3357 SubchannelCallBatchData::Create(elem, 2, false /* set_on_complete */);
3358 AddRetriableRecvTrailingMetadataOp(retry_state, batch_data);
3359 retry_state->recv_trailing_metadata_internal_batch = batch_data;
3360 // Note: This will release the call combiner.
3361 subchannel_call_->StartTransportStreamOpBatch(&batch_data->batch);
3364 // If there are any cached send ops that need to be replayed on the
3365 // current subchannel call, creates and returns a new subchannel batch
3366 // to replay those ops. Otherwise, returns nullptr.
3367 CallData::SubchannelCallBatchData*
3368 CallData::MaybeCreateSubchannelBatchForReplay(
3369 grpc_call_element* elem, SubchannelCallRetryState* retry_state) {
3370 ChannelData* chand = static_cast<ChannelData*>(elem->channel_data);
3371 SubchannelCallBatchData* replay_batch_data = nullptr;
3372 // send_initial_metadata.
3373 if (seen_send_initial_metadata_ &&
3374 !retry_state->started_send_initial_metadata &&
3375 !pending_send_initial_metadata_) {
3376 if (GRPC_TRACE_FLAG_ENABLED(grpc_client_channel_call_trace)) {
3378 "chand=%p calld=%p: replaying previously completed "
3379 "send_initial_metadata op",
3383 SubchannelCallBatchData::Create(elem, 1, true /* set_on_complete */);
3384 AddRetriableSendInitialMetadataOp(retry_state, replay_batch_data);
3387 // Note that we can only have one send_message op in flight at a time.
3388 if (retry_state->started_send_message_count < send_messages_.size() &&
3389 retry_state->started_send_message_count ==
3390 retry_state->completed_send_message_count &&
3391 !pending_send_message_) {
3392 if (GRPC_TRACE_FLAG_ENABLED(grpc_client_channel_call_trace)) {
3394 "chand=%p calld=%p: replaying previously completed "
3398 if (replay_batch_data == nullptr) {
3400 SubchannelCallBatchData::Create(elem, 1, true /* set_on_complete */);
3402 AddRetriableSendMessageOp(elem, retry_state, replay_batch_data);
3404 // send_trailing_metadata.
3405 // Note that we only add this op if we have no more send_message ops
3406 // to start, since we can't send down any more send_message ops after
3407 // send_trailing_metadata.
3408 if (seen_send_trailing_metadata_ &&
3409 retry_state->started_send_message_count == send_messages_.size() &&
3410 !retry_state->started_send_trailing_metadata &&
3411 !pending_send_trailing_metadata_) {
3412 if (GRPC_TRACE_FLAG_ENABLED(grpc_client_channel_call_trace)) {
3414 "chand=%p calld=%p: replaying previously completed "
3415 "send_trailing_metadata op",
3418 if (replay_batch_data == nullptr) {
3420 SubchannelCallBatchData::Create(elem, 1, true /* set_on_complete */);
3422 AddRetriableSendTrailingMetadataOp(retry_state, replay_batch_data);
3424 return replay_batch_data;
3427 void CallData::AddSubchannelBatchesForPendingBatches(
3428 grpc_call_element* elem, SubchannelCallRetryState* retry_state,
3429 CallCombinerClosureList* closures) {
3430 for (size_t i = 0; i < GPR_ARRAY_SIZE(pending_batches_); ++i) {
3431 PendingBatch* pending = &pending_batches_[i];
3432 grpc_transport_stream_op_batch* batch = pending->batch;
3433 if (batch == nullptr) continue;
3434 // Skip any batch that either (a) has already been started on this
3435 // subchannel call or (b) we can't start yet because we're still
3436 // replaying send ops that need to be completed first.
3437 // TODO(roth): Note that if any one op in the batch can't be sent
3438 // yet due to ops that we're replaying, we don't start any of the ops
3439 // in the batch. This is probably okay, but it could conceivably
3440 // lead to increased latency in some cases -- e.g., we could delay
3441 // starting a recv op due to it being in the same batch with a send
3442 // op. If/when we revamp the callback protocol in
3443 // transport_stream_op_batch, we may be able to fix this.
3444 if (batch->send_initial_metadata &&
3445 retry_state->started_send_initial_metadata) {
3448 if (batch->send_message && retry_state->completed_send_message_count <
3449 retry_state->started_send_message_count) {
3452 // Note that we only start send_trailing_metadata if we have no more
3453 // send_message ops to start, since we can't send down any more
3454 // send_message ops after send_trailing_metadata.
3455 if (batch->send_trailing_metadata &&
3456 (retry_state->started_send_message_count + batch->send_message <
3457 send_messages_.size() ||
3458 retry_state->started_send_trailing_metadata)) {
3461 if (batch->recv_initial_metadata &&
3462 retry_state->started_recv_initial_metadata) {
3465 if (batch->recv_message && retry_state->completed_recv_message_count <
3466 retry_state->started_recv_message_count) {
3469 if (batch->recv_trailing_metadata &&
3470 retry_state->started_recv_trailing_metadata) {
3471 // If we previously completed a recv_trailing_metadata op
3472 // initiated by StartInternalRecvTrailingMetadata(), use the
3473 // result of that instead of trying to re-start this op.
3474 if (GPR_UNLIKELY((retry_state->recv_trailing_metadata_internal_batch !=
3476 // If the batch completed, then trigger the completion callback
3477 // directly, so that we return the previously returned results to
3478 // the application. Otherwise, just unref the internally
3479 // started subchannel batch, since we'll propagate the
3480 // completion when it completes.
3481 if (retry_state->completed_recv_trailing_metadata) {
3482 // Batches containing recv_trailing_metadata always succeed.
3484 &retry_state->recv_trailing_metadata_ready, GRPC_ERROR_NONE,
3485 "re-executing recv_trailing_metadata_ready to propagate "
3486 "internally triggered result");
3488 retry_state->recv_trailing_metadata_internal_batch->Unref();
3490 retry_state->recv_trailing_metadata_internal_batch = nullptr;
3494 // If we're not retrying, just send the batch as-is.
3495 if (method_params_ == nullptr ||
3496 method_params_->retry_policy() == nullptr || retry_committed_) {
3497 // TODO(roth) : We should probably call
3498 // MaybeInjectRecvTrailingMetadataReadyForLoadBalancingPolicy here.
3499 AddClosureForSubchannelBatch(elem, batch, closures);
3500 PendingBatchClear(pending);
3503 // Create batch with the right number of callbacks.
3504 const bool has_send_ops = batch->send_initial_metadata ||
3505 batch->send_message ||
3506 batch->send_trailing_metadata;
3507 const int num_callbacks = has_send_ops + batch->recv_initial_metadata +
3508 batch->recv_message +
3509 batch->recv_trailing_metadata;
3510 SubchannelCallBatchData* batch_data = SubchannelCallBatchData::Create(
3511 elem, num_callbacks, has_send_ops /* set_on_complete */);
3512 // Cache send ops if needed.
3513 MaybeCacheSendOpsForBatch(pending);
3514 // send_initial_metadata.
3515 if (batch->send_initial_metadata) {
3516 AddRetriableSendInitialMetadataOp(retry_state, batch_data);
3519 if (batch->send_message) {
3520 AddRetriableSendMessageOp(elem, retry_state, batch_data);
3522 // send_trailing_metadata.
3523 if (batch->send_trailing_metadata) {
3524 AddRetriableSendTrailingMetadataOp(retry_state, batch_data);
3526 // recv_initial_metadata.
3527 if (batch->recv_initial_metadata) {
3528 // recv_flags is only used on the server side.
3529 GPR_ASSERT(batch->payload->recv_initial_metadata.recv_flags == nullptr);
3530 AddRetriableRecvInitialMetadataOp(retry_state, batch_data);
3533 if (batch->recv_message) {
3534 AddRetriableRecvMessageOp(retry_state, batch_data);
3536 // recv_trailing_metadata.
3537 if (batch->recv_trailing_metadata) {
3538 AddRetriableRecvTrailingMetadataOp(retry_state, batch_data);
3540 AddClosureForSubchannelBatch(elem, &batch_data->batch, closures);
3541 // Track number of pending subchannel send batches.
3542 // If this is the first one, take a ref to the call stack.
3543 if (batch->send_initial_metadata || batch->send_message ||
3544 batch->send_trailing_metadata) {
3545 if (num_pending_retriable_subchannel_send_batches_ == 0) {
3546 GRPC_CALL_STACK_REF(owning_call_, "subchannel_send_batches");
3548 ++num_pending_retriable_subchannel_send_batches_;
3553 void CallData::StartRetriableSubchannelBatches(void* arg, grpc_error* ignored) {
3554 grpc_call_element* elem = static_cast<grpc_call_element*>(arg);
3555 ChannelData* chand = static_cast<ChannelData*>(elem->channel_data);
3556 CallData* calld = static_cast<CallData*>(elem->call_data);
3557 if (GRPC_TRACE_FLAG_ENABLED(grpc_client_channel_call_trace)) {
3558 gpr_log(GPR_INFO, "chand=%p calld=%p: constructing retriable batches",
3561 SubchannelCallRetryState* retry_state =
3562 static_cast<SubchannelCallRetryState*>(
3563 calld->subchannel_call_->GetParentData());
3564 // Construct list of closures to execute, one for each pending batch.
3565 CallCombinerClosureList closures;
3566 // Replay previously-returned send_* ops if needed.
3567 SubchannelCallBatchData* replay_batch_data =
3568 calld->MaybeCreateSubchannelBatchForReplay(elem, retry_state);
3569 if (replay_batch_data != nullptr) {
3570 calld->AddClosureForSubchannelBatch(elem, &replay_batch_data->batch,
3572 // Track number of pending subchannel send batches.
3573 // If this is the first one, take a ref to the call stack.
3574 if (calld->num_pending_retriable_subchannel_send_batches_ == 0) {
3575 GRPC_CALL_STACK_REF(calld->owning_call_, "subchannel_send_batches");
3577 ++calld->num_pending_retriable_subchannel_send_batches_;
3579 // Now add pending batches.
3580 calld->AddSubchannelBatchesForPendingBatches(elem, retry_state, &closures);
3581 // Start batches on subchannel call.
3582 if (GRPC_TRACE_FLAG_ENABLED(grpc_client_channel_call_trace)) {
3584 "chand=%p calld=%p: starting %" PRIuPTR
3585 " retriable batches on subchannel_call=%p",
3586 chand, calld, closures.size(), calld->subchannel_call_.get());
3588 // Note: This will yield the call combiner.
3589 closures.RunClosures(calld->call_combiner_);
3596 void CallData::CreateSubchannelCall(grpc_call_element* elem) {
3597 ChannelData* chand = static_cast<ChannelData*>(elem->channel_data);
3598 const size_t parent_data_size =
3599 enable_retries_ ? sizeof(SubchannelCallRetryState) : 0;
3600 SubchannelCall::Args call_args = {
3601 std::move(connected_subchannel_), pollent_, path_, call_start_time_,
3603 // TODO(roth): When we implement hedging support, we will probably
3604 // need to use a separate call context for each subchannel call.
3605 call_context_, call_combiner_, parent_data_size};
3606 grpc_error* error = GRPC_ERROR_NONE;
3607 subchannel_call_ = SubchannelCall::Create(std::move(call_args), &error);
3608 if (GRPC_TRACE_FLAG_ENABLED(grpc_client_channel_routing_trace)) {
3609 gpr_log(GPR_INFO, "chand=%p calld=%p: create subchannel_call=%p: error=%s",
3610 chand, this, subchannel_call_.get(), grpc_error_string(error));
3612 if (GPR_UNLIKELY(error != GRPC_ERROR_NONE)) {
3613 PendingBatchesFail(elem, error, YieldCallCombiner);
3615 if (parent_data_size > 0) {
3616 new (subchannel_call_->GetParentData())
3617 SubchannelCallRetryState(call_context_);
3619 PendingBatchesResume(elem);
3623 void CallData::AsyncPickDone(grpc_call_element* elem, grpc_error* error) {
3624 GRPC_CLOSURE_INIT(&pick_closure_, PickDone, elem, grpc_schedule_on_exec_ctx);
3625 GRPC_CLOSURE_SCHED(&pick_closure_, error);
3628 void CallData::PickDone(void* arg, grpc_error* error) {
3629 grpc_call_element* elem = static_cast<grpc_call_element*>(arg);
3630 ChannelData* chand = static_cast<ChannelData*>(elem->channel_data);
3631 CallData* calld = static_cast<CallData*>(elem->call_data);
3632 if (error != GRPC_ERROR_NONE) {
3633 if (GRPC_TRACE_FLAG_ENABLED(grpc_client_channel_routing_trace)) {
3635 "chand=%p calld=%p: failed to pick subchannel: error=%s", chand,
3636 calld, grpc_error_string(error));
3638 calld->PendingBatchesFail(elem, GRPC_ERROR_REF(error), YieldCallCombiner);
3641 calld->CreateSubchannelCall(elem);
3644 // A class to handle the call combiner cancellation callback for a
3646 class CallData::QueuedPickCanceller {
3648 explicit QueuedPickCanceller(grpc_call_element* elem) : elem_(elem) {
3649 auto* calld = static_cast<CallData*>(elem->call_data);
3650 GRPC_CALL_STACK_REF(calld->owning_call_, "QueuedPickCanceller");
3651 GRPC_CLOSURE_INIT(&closure_, &CancelLocked, this,
3652 grpc_schedule_on_exec_ctx);
3653 calld->call_combiner_->SetNotifyOnCancel(&closure_);
3657 static void CancelLocked(void* arg, grpc_error* error) {
3658 auto* self = static_cast<QueuedPickCanceller*>(arg);
3659 auto* chand = static_cast<ChannelData*>(self->elem_->channel_data);
3660 auto* calld = static_cast<CallData*>(self->elem_->call_data);
3661 MutexLock lock(chand->data_plane_mu());
3662 if (GRPC_TRACE_FLAG_ENABLED(grpc_client_channel_routing_trace)) {
3664 "chand=%p calld=%p: cancelling queued pick: "
3665 "error=%s self=%p calld->pick_canceller=%p",
3666 chand, calld, grpc_error_string(error), self,
3667 calld->pick_canceller_);
3669 if (calld->pick_canceller_ == self && error != GRPC_ERROR_NONE) {
3670 // Remove pick from list of queued picks.
3671 calld->RemoveCallFromQueuedPicksLocked(self->elem_);
3672 // Fail pending batches on the call.
3673 calld->PendingBatchesFail(self->elem_, GRPC_ERROR_REF(error),
3674 YieldCallCombinerIfPendingBatchesFound);
3676 GRPC_CALL_STACK_UNREF(calld->owning_call_, "QueuedPickCanceller");
3680 grpc_call_element* elem_;
3681 grpc_closure closure_;
3684 void CallData::RemoveCallFromQueuedPicksLocked(grpc_call_element* elem) {
3685 auto* chand = static_cast<ChannelData*>(elem->channel_data);
3686 if (GRPC_TRACE_FLAG_ENABLED(grpc_client_channel_routing_trace)) {
3687 gpr_log(GPR_INFO, "chand=%p calld=%p: removing from queued picks list",
3690 chand->RemoveQueuedPick(&pick_, pollent_);
3691 pick_queued_ = false;
3692 // Lame the call combiner canceller.
3693 pick_canceller_ = nullptr;
3696 void CallData::AddCallToQueuedPicksLocked(grpc_call_element* elem) {
3697 auto* chand = static_cast<ChannelData*>(elem->channel_data);
3698 if (GRPC_TRACE_FLAG_ENABLED(grpc_client_channel_routing_trace)) {
3699 gpr_log(GPR_INFO, "chand=%p calld=%p: adding to queued picks list", chand,
3702 pick_queued_ = true;
3704 chand->AddQueuedPick(&pick_, pollent_);
3705 // Register call combiner cancellation callback.
3706 pick_canceller_ = New<QueuedPickCanceller>(elem);
3709 void CallData::ApplyServiceConfigToCallLocked(grpc_call_element* elem) {
3710 ChannelData* chand = static_cast<ChannelData*>(elem->channel_data);
3711 if (GRPC_TRACE_FLAG_ENABLED(grpc_client_channel_routing_trace)) {
3712 gpr_log(GPR_INFO, "chand=%p calld=%p: applying service config to call",
3715 // Store a ref to the service_config in service_config_call_data_. Also, save
3716 // a pointer to this in the call_context so that all future filters can access
3718 service_config_call_data_ =
3719 ServiceConfig::CallData(chand->service_config(), path_);
3720 if (service_config_call_data_.service_config() != nullptr) {
3721 call_context_[GRPC_CONTEXT_SERVICE_CONFIG_CALL_DATA].value =
3722 &service_config_call_data_;
3723 method_params_ = static_cast<ClientChannelMethodParsedConfig*>(
3724 service_config_call_data_.GetMethodParsedConfig(
3725 internal::ClientChannelServiceConfigParser::ParserIndex()));
3727 retry_throttle_data_ = chand->retry_throttle_data();
3728 if (method_params_ != nullptr) {
3729 // If the deadline from the service config is shorter than the one
3730 // from the client API, reset the deadline timer.
3731 if (chand->deadline_checking_enabled() && method_params_->timeout() != 0) {
3732 const grpc_millis per_method_deadline =
3733 grpc_cycle_counter_to_millis_round_up(call_start_time_) +
3734 method_params_->timeout();
3735 if (per_method_deadline < deadline_) {
3736 deadline_ = per_method_deadline;
3737 grpc_deadline_state_reset(elem, deadline_);
3740 // If the service config set wait_for_ready and the application
3741 // did not explicitly set it, use the value from the service config.
3742 uint32_t* send_initial_metadata_flags =
3743 &pending_batches_[0]
3744 .batch->payload->send_initial_metadata.send_initial_metadata_flags;
3745 if (method_params_->wait_for_ready().has_value() &&
3746 !(*send_initial_metadata_flags &
3747 GRPC_INITIAL_METADATA_WAIT_FOR_READY_EXPLICITLY_SET)) {
3748 if (method_params_->wait_for_ready().value()) {
3749 *send_initial_metadata_flags |= GRPC_INITIAL_METADATA_WAIT_FOR_READY;
3751 *send_initial_metadata_flags &= ~GRPC_INITIAL_METADATA_WAIT_FOR_READY;
3755 // If no retry policy, disable retries.
3756 // TODO(roth): Remove this when adding support for transparent retries.
3757 if (method_params_ == nullptr || method_params_->retry_policy() == nullptr) {
3758 enable_retries_ = false;
3762 void CallData::MaybeApplyServiceConfigToCallLocked(grpc_call_element* elem) {
3763 ChannelData* chand = static_cast<ChannelData*>(elem->channel_data);
3764 // Apply service config data to the call only once, and only if the
3765 // channel has the data available.
3766 if (GPR_LIKELY(chand->received_service_config_data() &&
3767 !service_config_applied_)) {
3768 service_config_applied_ = true;
3769 ApplyServiceConfigToCallLocked(elem);
3773 const char* PickResultTypeName(
3774 LoadBalancingPolicy::PickResult::ResultType type) {
3776 case LoadBalancingPolicy::PickResult::PICK_COMPLETE:
3778 case LoadBalancingPolicy::PickResult::PICK_QUEUE:
3780 case LoadBalancingPolicy::PickResult::PICK_FAILED:
3783 GPR_UNREACHABLE_CODE(return "UNKNOWN");
3786 void CallData::PickSubchannel(void* arg, grpc_error* error) {
3787 grpc_call_element* elem = static_cast<grpc_call_element*>(arg);
3788 CallData* calld = static_cast<CallData*>(elem->call_data);
3789 ChannelData* chand = static_cast<ChannelData*>(elem->channel_data);
3792 MutexLock lock(chand->data_plane_mu());
3793 pick_complete = calld->PickSubchannelLocked(elem, &error);
3795 if (pick_complete) {
3796 PickDone(elem, error);
3797 GRPC_ERROR_UNREF(error);
3801 bool CallData::PickSubchannelLocked(grpc_call_element* elem,
3802 grpc_error** error) {
3803 ChannelData* chand = static_cast<ChannelData*>(elem->channel_data);
3804 GPR_ASSERT(connected_subchannel_ == nullptr);
3805 GPR_ASSERT(subchannel_call_ == nullptr);
3806 // The picker being null means that the channel is currently in IDLE state.
3807 // The incoming call will make the channel exit IDLE.
3808 if (chand->picker() == nullptr) {
3809 // Bounce into the control plane combiner to exit IDLE.
3810 chand->CheckConnectivityState(/*try_to_connect=*/true);
3811 // Queue the pick, so that it will be attempted once the channel
3812 // becomes connected.
3813 AddCallToQueuedPicksLocked(elem);
3816 // Apply service config to call if needed.
3817 MaybeApplyServiceConfigToCallLocked(elem);
3818 // If this is a retry, use the send_initial_metadata payload that
3819 // we've cached; otherwise, use the pending batch. The
3820 // send_initial_metadata batch will be the first pending batch in the
3821 // list, as set by GetBatchIndex() above.
3822 // TODO(roth): What if the LB policy needs to add something to the
3823 // call's initial metadata, and then there's a retry? We don't want
3824 // the new metadata to be added twice. We might need to somehow
3825 // allocate the subchannel batch earlier so that we can give the
3826 // subchannel's copy of the metadata batch (which is copied for each
3827 // attempt) to the LB policy instead the one from the parent channel.
3828 LoadBalancingPolicy::PickArgs pick_args;
3829 pick_args.call_state = &lb_call_state_;
3830 Metadata initial_metadata(
3832 seen_send_initial_metadata_
3833 ? &send_initial_metadata_
3834 : pending_batches_[0]
3835 .batch->payload->send_initial_metadata.send_initial_metadata);
3836 pick_args.initial_metadata = &initial_metadata;
3837 // Grab initial metadata flags so that we can check later if the call has
3838 // wait_for_ready enabled.
3839 const uint32_t send_initial_metadata_flags =
3840 seen_send_initial_metadata_ ? send_initial_metadata_flags_
3841 : pending_batches_[0]
3842 .batch->payload->send_initial_metadata
3843 .send_initial_metadata_flags;
3845 auto result = chand->picker()->Pick(pick_args);
3846 if (GRPC_TRACE_FLAG_ENABLED(grpc_client_channel_routing_trace)) {
3848 "chand=%p calld=%p: LB pick returned %s (subchannel=%p, error=%s)",
3849 chand, this, PickResultTypeName(result.type),
3850 result.subchannel.get(), grpc_error_string(result.error));
3852 switch (result.type) {
3853 case LoadBalancingPolicy::PickResult::PICK_FAILED: {
3854 // If we're shutting down, fail all RPCs.
3855 grpc_error* disconnect_error = chand->disconnect_error();
3856 if (disconnect_error != GRPC_ERROR_NONE) {
3857 GRPC_ERROR_UNREF(result.error);
3858 if (pick_queued_) RemoveCallFromQueuedPicksLocked(elem);
3859 *error = GRPC_ERROR_REF(disconnect_error);
3862 // If wait_for_ready is false, then the error indicates the RPC
3863 // attempt's final status.
3864 if ((send_initial_metadata_flags &
3865 GRPC_INITIAL_METADATA_WAIT_FOR_READY) == 0) {
3866 // Retry if appropriate; otherwise, fail.
3867 grpc_status_code status = GRPC_STATUS_OK;
3868 grpc_error_get_status(result.error, deadline_, &status, nullptr,
3870 const bool retried = enable_retries_ &&
3871 MaybeRetry(elem, nullptr /* batch_data */, status,
3872 nullptr /* server_pushback_md */);
3874 grpc_error* new_error =
3875 GRPC_ERROR_CREATE_REFERENCING_FROM_STATIC_STRING(
3876 "Failed to pick subchannel", &result.error, 1);
3877 GRPC_ERROR_UNREF(result.error);
3880 if (pick_queued_) RemoveCallFromQueuedPicksLocked(elem);
3883 // If wait_for_ready is true, then queue to retry when we get a new
3885 GRPC_ERROR_UNREF(result.error);
3888 case LoadBalancingPolicy::PickResult::PICK_QUEUE:
3889 if (!pick_queued_) AddCallToQueuedPicksLocked(elem);
3891 default: // PICK_COMPLETE
3892 if (pick_queued_) RemoveCallFromQueuedPicksLocked(elem);
3894 if (GPR_UNLIKELY(result.subchannel == nullptr)) {
3895 result.error = GRPC_ERROR_CREATE_FROM_STATIC_STRING(
3896 "Call dropped by load balancing policy");
3898 // Grab a ref to the connected subchannel while we're still
3899 // holding the data plane mutex.
3900 connected_subchannel_ =
3901 chand->GetConnectedSubchannelInDataPlane(result.subchannel.get());
3902 GPR_ASSERT(connected_subchannel_ != nullptr);
3904 lb_recv_trailing_metadata_ready_ = result.recv_trailing_metadata_ready;
3905 lb_recv_trailing_metadata_ready_user_data_ =
3906 result.recv_trailing_metadata_ready_user_data;
3907 *error = result.error;
3913 } // namespace grpc_core
3915 /*************************************************************************
3919 using grpc_core::CallData;
3920 using grpc_core::ChannelData;
3922 const grpc_channel_filter grpc_client_channel_filter = {
3923 CallData::StartTransportStreamOpBatch,
3924 ChannelData::StartTransportOp,
3927 CallData::SetPollent,
3929 sizeof(ChannelData),
3931 ChannelData::Destroy,
3932 ChannelData::GetChannelInfo,
3936 grpc_connectivity_state grpc_client_channel_check_connectivity_state(
3937 grpc_channel_element* elem, int try_to_connect) {
3938 auto* chand = static_cast<ChannelData*>(elem->channel_data);
3939 return chand->CheckConnectivityState(try_to_connect);
3942 int grpc_client_channel_num_external_connectivity_watchers(
3943 grpc_channel_element* elem) {
3944 auto* chand = static_cast<ChannelData*>(elem->channel_data);
3945 return chand->NumExternalConnectivityWatchers();
3948 void grpc_client_channel_watch_connectivity_state(
3949 grpc_channel_element* elem, grpc_polling_entity pollent,
3950 grpc_connectivity_state* state, grpc_closure* closure,
3951 grpc_closure* watcher_timer_init) {
3952 auto* chand = static_cast<ChannelData*>(elem->channel_data);
3953 return chand->AddExternalConnectivityWatcher(pollent, state, closure,
3954 watcher_timer_init);
3957 grpc_core::RefCountedPtr<grpc_core::SubchannelCall>
3958 grpc_client_channel_get_subchannel_call(grpc_call_element* elem) {
3959 auto* calld = static_cast<CallData*>(elem->call_data);
3960 return calld->subchannel_call();