Built motion from commit 6a09e18b.|2.6.11
[motion2.git] / legacy-libs / grpc / deps / grpc / src / core / ext / filters / client_channel / client_channel.cc
1 /*
2  *
3  * Copyright 2015 gRPC authors.
4  *
5  * Licensed under the Apache License, Version 2.0 (the "License");
6  * you may not use this file except in compliance with the License.
7  * You may obtain a copy of the License at
8  *
9  *     http://www.apache.org/licenses/LICENSE-2.0
10  *
11  * Unless required by applicable law or agreed to in writing, software
12  * distributed under the License is distributed on an "AS IS" BASIS,
13  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14  * See the License for the specific language governing permissions and
15  * limitations under the License.
16  *
17  */
18
19 #include <grpc/support/port_platform.h>
20
21 #include "src/core/ext/filters/client_channel/client_channel.h"
22
23 #include <inttypes.h>
24 #include <limits.h>
25 #include <stdbool.h>
26 #include <stdio.h>
27 #include <string.h>
28
29 #include <grpc/support/alloc.h>
30 #include <grpc/support/log.h>
31 #include <grpc/support/string_util.h>
32 #include <grpc/support/sync.h>
33
34 #include "src/core/ext/filters/client_channel/backend_metric.h"
35 #include "src/core/ext/filters/client_channel/backup_poller.h"
36 #include "src/core/ext/filters/client_channel/global_subchannel_pool.h"
37 #include "src/core/ext/filters/client_channel/http_connect_handshaker.h"
38 #include "src/core/ext/filters/client_channel/lb_policy_registry.h"
39 #include "src/core/ext/filters/client_channel/local_subchannel_pool.h"
40 #include "src/core/ext/filters/client_channel/proxy_mapper_registry.h"
41 #include "src/core/ext/filters/client_channel/resolver_registry.h"
42 #include "src/core/ext/filters/client_channel/resolver_result_parsing.h"
43 #include "src/core/ext/filters/client_channel/resolving_lb_policy.h"
44 #include "src/core/ext/filters/client_channel/retry_throttle.h"
45 #include "src/core/ext/filters/client_channel/service_config.h"
46 #include "src/core/ext/filters/client_channel/subchannel.h"
47 #include "src/core/ext/filters/deadline/deadline_filter.h"
48 #include "src/core/lib/backoff/backoff.h"
49 #include "src/core/lib/channel/channel_args.h"
50 #include "src/core/lib/channel/connected_channel.h"
51 #include "src/core/lib/channel/status_util.h"
52 #include "src/core/lib/gpr/string.h"
53 #include "src/core/lib/gprpp/inlined_vector.h"
54 #include "src/core/lib/gprpp/manual_constructor.h"
55 #include "src/core/lib/gprpp/map.h"
56 #include "src/core/lib/gprpp/sync.h"
57 #include "src/core/lib/iomgr/combiner.h"
58 #include "src/core/lib/iomgr/iomgr.h"
59 #include "src/core/lib/iomgr/polling_entity.h"
60 #include "src/core/lib/profiling/timers.h"
61 #include "src/core/lib/slice/slice_internal.h"
62 #include "src/core/lib/slice/slice_string_helpers.h"
63 #include "src/core/lib/surface/channel.h"
64 #include "src/core/lib/transport/connectivity_state.h"
65 #include "src/core/lib/transport/error_utils.h"
66 #include "src/core/lib/transport/metadata.h"
67 #include "src/core/lib/transport/metadata_batch.h"
68 #include "src/core/lib/transport/static_metadata.h"
69 #include "src/core/lib/transport/status_metadata.h"
70
71 using grpc_core::internal::ClientChannelMethodParsedConfig;
72 using grpc_core::internal::ServerRetryThrottleData;
73
74 //
75 // Client channel filter
76 //
77
78 // By default, we buffer 256 KiB per RPC for retries.
79 // TODO(roth): Do we have any data to suggest a better value?
80 #define DEFAULT_PER_RPC_RETRY_BUFFER_SIZE (256 << 10)
81
82 // This value was picked arbitrarily.  It can be changed if there is
83 // any even moderately compelling reason to do so.
84 #define RETRY_BACKOFF_JITTER 0.2
85
86 // Max number of batches that can be pending on a call at any given
87 // time.  This includes one batch for each of the following ops:
88 //   recv_initial_metadata
89 //   send_initial_metadata
90 //   recv_message
91 //   send_message
92 //   recv_trailing_metadata
93 //   send_trailing_metadata
94 #define MAX_PENDING_BATCHES 6
95
96 namespace grpc_core {
97
98 TraceFlag grpc_client_channel_call_trace(false, "client_channel_call");
99 TraceFlag grpc_client_channel_routing_trace(false, "client_channel_routing");
100
101 namespace {
102
103 //
104 // ChannelData definition
105 //
106
107 class ChannelData {
108  public:
109   struct QueuedPick {
110     grpc_call_element* elem;
111     QueuedPick* next = nullptr;
112   };
113
114   static grpc_error* Init(grpc_channel_element* elem,
115                           grpc_channel_element_args* args);
116   static void Destroy(grpc_channel_element* elem);
117   static void StartTransportOp(grpc_channel_element* elem,
118                                grpc_transport_op* op);
119   static void GetChannelInfo(grpc_channel_element* elem,
120                              const grpc_channel_info* info);
121
122   bool deadline_checking_enabled() const { return deadline_checking_enabled_; }
123   bool enable_retries() const { return enable_retries_; }
124   size_t per_rpc_retry_buffer_size() const {
125     return per_rpc_retry_buffer_size_;
126   }
127
128   // Note: Does NOT return a new ref.
129   grpc_error* disconnect_error() const {
130     return disconnect_error_.Load(MemoryOrder::ACQUIRE);
131   }
132
133   Mutex* data_plane_mu() const { return &data_plane_mu_; }
134
135   LoadBalancingPolicy::SubchannelPicker* picker() const {
136     return picker_.get();
137   }
138   void AddQueuedPick(QueuedPick* pick, grpc_polling_entity* pollent);
139   void RemoveQueuedPick(QueuedPick* to_remove, grpc_polling_entity* pollent);
140
141   bool received_service_config_data() const {
142     return received_service_config_data_;
143   }
144   RefCountedPtr<ServerRetryThrottleData> retry_throttle_data() const {
145     return retry_throttle_data_;
146   }
147   RefCountedPtr<ServiceConfig> service_config() const {
148     return service_config_;
149   }
150
151   RefCountedPtr<ConnectedSubchannel> GetConnectedSubchannelInDataPlane(
152       SubchannelInterface* subchannel) const;
153
154   grpc_connectivity_state CheckConnectivityState(bool try_to_connect);
155   void AddExternalConnectivityWatcher(grpc_polling_entity pollent,
156                                       grpc_connectivity_state* state,
157                                       grpc_closure* on_complete,
158                                       grpc_closure* watcher_timer_init) {
159     // Will delete itself.
160     New<ExternalConnectivityWatcher>(this, pollent, state, on_complete,
161                                      watcher_timer_init);
162   }
163   int NumExternalConnectivityWatchers() const {
164     return external_connectivity_watcher_list_.size();
165   }
166
167  private:
168   class SubchannelWrapper;
169   class ClientChannelControlHelper;
170
171   class ExternalConnectivityWatcher {
172    public:
173     class WatcherList {
174      public:
175       WatcherList() { gpr_mu_init(&mu_); }
176       ~WatcherList() { gpr_mu_destroy(&mu_); }
177
178       int size() const;
179       ExternalConnectivityWatcher* Lookup(grpc_closure* on_complete) const;
180       void Add(ExternalConnectivityWatcher* watcher);
181       void Remove(const ExternalConnectivityWatcher* watcher);
182
183      private:
184       // head_ is guarded by a mutex, since the size() method needs to
185       // iterate over the list, and it's called from the C-core API
186       // function grpc_channel_num_external_connectivity_watchers(), which
187       // is synchronous and therefore cannot run in the combiner.
188       mutable gpr_mu mu_;
189       ExternalConnectivityWatcher* head_ = nullptr;
190     };
191
192     ExternalConnectivityWatcher(ChannelData* chand, grpc_polling_entity pollent,
193                                 grpc_connectivity_state* state,
194                                 grpc_closure* on_complete,
195                                 grpc_closure* watcher_timer_init);
196
197     ~ExternalConnectivityWatcher();
198
199    private:
200     static void OnWatchCompleteLocked(void* arg, grpc_error* error);
201     static void WatchConnectivityStateLocked(void* arg, grpc_error* ignored);
202
203     ChannelData* chand_;
204     grpc_polling_entity pollent_;
205     grpc_connectivity_state* state_;
206     grpc_closure* on_complete_;
207     grpc_closure* watcher_timer_init_;
208     grpc_closure my_closure_;
209     ExternalConnectivityWatcher* next_ = nullptr;
210   };
211
212   ChannelData(grpc_channel_element_args* args, grpc_error** error);
213   ~ChannelData();
214
215   void UpdateStateAndPickerLocked(
216       grpc_connectivity_state state, const char* reason,
217       UniquePtr<LoadBalancingPolicy::SubchannelPicker> picker);
218
219   void UpdateServiceConfigLocked(
220       RefCountedPtr<ServerRetryThrottleData> retry_throttle_data,
221       RefCountedPtr<ServiceConfig> service_config);
222
223   void CreateResolvingLoadBalancingPolicyLocked();
224
225   void DestroyResolvingLoadBalancingPolicyLocked();
226
227   static bool ProcessResolverResultLocked(
228       void* arg, const Resolver::Result& result, const char** lb_policy_name,
229       RefCountedPtr<LoadBalancingPolicy::Config>* lb_policy_config,
230       grpc_error** service_config_error);
231
232   grpc_error* DoPingLocked(grpc_transport_op* op);
233
234   static void StartTransportOpLocked(void* arg, grpc_error* ignored);
235
236   static void TryToConnectLocked(void* arg, grpc_error* error_ignored);
237
238   void ProcessLbPolicy(
239       const Resolver::Result& resolver_result,
240       const internal::ClientChannelGlobalParsedConfig* parsed_service_config,
241       UniquePtr<char>* lb_policy_name,
242       RefCountedPtr<LoadBalancingPolicy::Config>* lb_policy_config);
243
244   //
245   // Fields set at construction and never modified.
246   //
247   const bool deadline_checking_enabled_;
248   const bool enable_retries_;
249   const size_t per_rpc_retry_buffer_size_;
250   grpc_channel_stack* owning_stack_;
251   ClientChannelFactory* client_channel_factory_;
252   const grpc_channel_args* channel_args_;
253   RefCountedPtr<ServiceConfig> default_service_config_;
254   UniquePtr<char> server_name_;
255   UniquePtr<char> target_uri_;
256   channelz::ChannelNode* channelz_node_;
257
258   //
259   // Fields used in the data plane.  Guarded by data_plane_mu.
260   //
261   mutable Mutex data_plane_mu_;
262   UniquePtr<LoadBalancingPolicy::SubchannelPicker> picker_;
263   QueuedPick* queued_picks_ = nullptr;  // Linked list of queued picks.
264   // Data from service config.
265   bool received_service_config_data_ = false;
266   RefCountedPtr<ServerRetryThrottleData> retry_throttle_data_;
267   RefCountedPtr<ServiceConfig> service_config_;
268
269   //
270   // Fields used in the control plane.  Guarded by combiner.
271   //
272   grpc_combiner* combiner_;
273   grpc_pollset_set* interested_parties_;
274   RefCountedPtr<SubchannelPoolInterface> subchannel_pool_;
275   OrphanablePtr<ResolvingLoadBalancingPolicy> resolving_lb_policy_;
276   grpc_connectivity_state_tracker state_tracker_;
277   ExternalConnectivityWatcher::WatcherList external_connectivity_watcher_list_;
278   UniquePtr<char> health_check_service_name_;
279   RefCountedPtr<ServiceConfig> saved_service_config_;
280   bool received_first_resolver_result_ = false;
281   // The number of SubchannelWrapper instances referencing a given Subchannel.
282   Map<Subchannel*, int> subchannel_refcount_map_;
283   // The set of SubchannelWrappers that currently exist.
284   // No need to hold a ref, since the map is updated in the control-plane
285   // combiner when the SubchannelWrappers are created and destroyed.
286   // TODO(roth): We really want to use a set here, not a map.  Since we don't
287   // currently have a set implementation, we use a map and ignore the value.
288   Map<SubchannelWrapper*, bool> subchannel_wrappers_;
289   // Pending ConnectedSubchannel updates for each SubchannelWrapper.
290   // Updates are queued here in the control plane combiner and then applied
291   // in the data plane mutex when the picker is updated.
292   Map<RefCountedPtr<SubchannelWrapper>, RefCountedPtr<ConnectedSubchannel>,
293       RefCountedPtrLess<SubchannelWrapper>>
294       pending_subchannel_updates_;
295
296   //
297   // Fields accessed from both data plane mutex and control plane combiner.
298   //
299   Atomic<grpc_error*> disconnect_error_;
300
301   //
302   // Fields guarded by a mutex, since they need to be accessed
303   // synchronously via get_channel_info().
304   //
305   gpr_mu info_mu_;
306   UniquePtr<char> info_lb_policy_name_;
307   UniquePtr<char> info_service_config_json_;
308 };
309
310 //
311 // CallData definition
312 //
313
314 class CallData {
315  public:
316   static grpc_error* Init(grpc_call_element* elem,
317                           const grpc_call_element_args* args);
318   static void Destroy(grpc_call_element* elem,
319                       const grpc_call_final_info* final_info,
320                       grpc_closure* then_schedule_closure);
321   static void StartTransportStreamOpBatch(
322       grpc_call_element* elem, grpc_transport_stream_op_batch* batch);
323   static void SetPollent(grpc_call_element* elem, grpc_polling_entity* pollent);
324
325   RefCountedPtr<SubchannelCall> subchannel_call() { return subchannel_call_; }
326
327   // Invoked by channel for queued picks once resolver results are available.
328   void MaybeApplyServiceConfigToCallLocked(grpc_call_element* elem);
329
330   // Invoked by channel for queued picks when the picker is updated.
331   static void PickSubchannel(void* arg, grpc_error* error);
332
333   // Helper function for performing a pick while holding the data plane
334   // mutex.  Returns true if the pick is complete, in which case the caller
335   // must invoke PickDone() or AsyncPickDone() with the returned error.
336   bool PickSubchannelLocked(grpc_call_element* elem, grpc_error** error);
337
338   // Schedules a callback to process the completed pick.  The callback
339   // will not run until after this method returns.
340   void AsyncPickDone(grpc_call_element* elem, grpc_error* error);
341
342  private:
343   class QueuedPickCanceller;
344
345   class Metadata : public LoadBalancingPolicy::MetadataInterface {
346    public:
347     Metadata(CallData* calld, grpc_metadata_batch* batch)
348         : calld_(calld), batch_(batch) {}
349
350     void Add(StringView key, StringView value) override {
351       grpc_linked_mdelem* linked_mdelem = static_cast<grpc_linked_mdelem*>(
352           calld_->arena_->Alloc(sizeof(grpc_linked_mdelem)));
353       linked_mdelem->md = grpc_mdelem_from_slices(
354           grpc_core::ExternallyManagedSlice(key.data(), key.size()),
355           grpc_core::ExternallyManagedSlice(value.data(), value.size()));
356       GPR_ASSERT(grpc_metadata_batch_link_tail(batch_, linked_mdelem) ==
357                  GRPC_ERROR_NONE);
358     }
359
360     Iterator Begin() const override {
361       static_assert(sizeof(grpc_linked_mdelem*) <= sizeof(Iterator),
362                     "iterator size too large");
363       return reinterpret_cast<Iterator>(batch_->list.head);
364     }
365     bool IsEnd(Iterator it) const override {
366       return reinterpret_cast<grpc_linked_mdelem*>(it) == nullptr;
367     }
368     void Next(Iterator* it) const override {
369       *it = reinterpret_cast<Iterator>(
370           reinterpret_cast<grpc_linked_mdelem*>(*it)->next);
371     }
372     StringView Key(Iterator it) const override {
373       return StringView(
374           GRPC_MDKEY(reinterpret_cast<grpc_linked_mdelem*>(it)->md));
375     }
376     StringView Value(Iterator it) const override {
377       return StringView(
378           GRPC_MDVALUE(reinterpret_cast<grpc_linked_mdelem*>(it)->md));
379     }
380
381     void Erase(Iterator* it) override {
382       grpc_linked_mdelem* linked_mdelem =
383           reinterpret_cast<grpc_linked_mdelem*>(*it);
384       *it = reinterpret_cast<Iterator>(linked_mdelem->next);
385       grpc_metadata_batch_remove(batch_, linked_mdelem);
386     }
387
388    private:
389     CallData* calld_;
390     grpc_metadata_batch* batch_;
391   };
392
393   class LbCallState : public LoadBalancingPolicy::CallState {
394    public:
395     explicit LbCallState(CallData* calld) : calld_(calld) {}
396
397     void* Alloc(size_t size) override { return calld_->arena_->Alloc(size); }
398
399     const LoadBalancingPolicy::BackendMetricData* GetBackendMetricData()
400         override {
401       if (calld_->backend_metric_data_ == nullptr) {
402         grpc_linked_mdelem* md = calld_->recv_trailing_metadata_->idx.named
403                                      .x_endpoint_load_metrics_bin;
404         if (md != nullptr) {
405           calld_->backend_metric_data_ =
406               ParseBackendMetricData(GRPC_MDVALUE(md->md), calld_->arena_);
407         }
408       }
409       return calld_->backend_metric_data_;
410     }
411
412    private:
413     CallData* calld_;
414   };
415
416   // State used for starting a retryable batch on a subchannel call.
417   // This provides its own grpc_transport_stream_op_batch and other data
418   // structures needed to populate the ops in the batch.
419   // We allocate one struct on the arena for each attempt at starting a
420   // batch on a given subchannel call.
421   struct SubchannelCallBatchData {
422     // Creates a SubchannelCallBatchData object on the call's arena with the
423     // specified refcount.  If set_on_complete is true, the batch's
424     // on_complete callback will be set to point to on_complete();
425     // otherwise, the batch's on_complete callback will be null.
426     static SubchannelCallBatchData* Create(grpc_call_element* elem,
427                                            int refcount, bool set_on_complete);
428
429     void Unref() {
430       if (gpr_unref(&refs)) Destroy();
431     }
432
433     SubchannelCallBatchData(grpc_call_element* elem, CallData* calld,
434                             int refcount, bool set_on_complete);
435     // All dtor code must be added in `Destroy()`. This is because we may
436     // call closures in `SubchannelCallBatchData` after they are unrefed by
437     // `Unref()`, and msan would complain about accessing this class
438     // after calling dtor. As a result we cannot call the `dtor` in `Unref()`.
439     // TODO(soheil): We should try to call the dtor in `Unref()`.
440     ~SubchannelCallBatchData() { Destroy(); }
441     void Destroy();
442
443     gpr_refcount refs;
444     grpc_call_element* elem;
445     RefCountedPtr<SubchannelCall> subchannel_call;
446     // The batch to use in the subchannel call.
447     // Its payload field points to SubchannelCallRetryState::batch_payload.
448     grpc_transport_stream_op_batch batch;
449     // For intercepting on_complete.
450     grpc_closure on_complete;
451   };
452
453   // Retry state associated with a subchannel call.
454   // Stored in the parent_data of the subchannel call object.
455   struct SubchannelCallRetryState {
456     explicit SubchannelCallRetryState(grpc_call_context_element* context)
457         : batch_payload(context),
458           started_send_initial_metadata(false),
459           completed_send_initial_metadata(false),
460           started_send_trailing_metadata(false),
461           completed_send_trailing_metadata(false),
462           started_recv_initial_metadata(false),
463           completed_recv_initial_metadata(false),
464           started_recv_trailing_metadata(false),
465           completed_recv_trailing_metadata(false),
466           retry_dispatched(false) {}
467
468     // SubchannelCallBatchData.batch.payload points to this.
469     grpc_transport_stream_op_batch_payload batch_payload;
470     // For send_initial_metadata.
471     // Note that we need to make a copy of the initial metadata for each
472     // subchannel call instead of just referring to the copy in call_data,
473     // because filters in the subchannel stack will probably add entries,
474     // so we need to start in a pristine state for each attempt of the call.
475     grpc_linked_mdelem* send_initial_metadata_storage;
476     grpc_metadata_batch send_initial_metadata;
477     // For send_message.
478     // TODO(roth): Restructure this to eliminate use of ManualConstructor.
479     ManualConstructor<ByteStreamCache::CachingByteStream> send_message;
480     // For send_trailing_metadata.
481     grpc_linked_mdelem* send_trailing_metadata_storage;
482     grpc_metadata_batch send_trailing_metadata;
483     // For intercepting recv_initial_metadata.
484     grpc_metadata_batch recv_initial_metadata;
485     grpc_closure recv_initial_metadata_ready;
486     bool trailing_metadata_available = false;
487     // For intercepting recv_message.
488     grpc_closure recv_message_ready;
489     OrphanablePtr<ByteStream> recv_message;
490     // For intercepting recv_trailing_metadata.
491     grpc_metadata_batch recv_trailing_metadata;
492     grpc_transport_stream_stats collect_stats;
493     grpc_closure recv_trailing_metadata_ready;
494     // These fields indicate which ops have been started and completed on
495     // this subchannel call.
496     size_t started_send_message_count = 0;
497     size_t completed_send_message_count = 0;
498     size_t started_recv_message_count = 0;
499     size_t completed_recv_message_count = 0;
500     bool started_send_initial_metadata : 1;
501     bool completed_send_initial_metadata : 1;
502     bool started_send_trailing_metadata : 1;
503     bool completed_send_trailing_metadata : 1;
504     bool started_recv_initial_metadata : 1;
505     bool completed_recv_initial_metadata : 1;
506     bool started_recv_trailing_metadata : 1;
507     bool completed_recv_trailing_metadata : 1;
508     // State for callback processing.
509     SubchannelCallBatchData* recv_initial_metadata_ready_deferred_batch =
510         nullptr;
511     grpc_error* recv_initial_metadata_error = GRPC_ERROR_NONE;
512     SubchannelCallBatchData* recv_message_ready_deferred_batch = nullptr;
513     grpc_error* recv_message_error = GRPC_ERROR_NONE;
514     SubchannelCallBatchData* recv_trailing_metadata_internal_batch = nullptr;
515     // NOTE: Do not move this next to the metadata bitfields above. That would
516     //       save space but will also result in a data race because compiler
517     //       will generate a 2 byte store which overwrites the meta-data
518     //       fields upon setting this field.
519     bool retry_dispatched : 1;
520   };
521
522   // Pending batches stored in call data.
523   struct PendingBatch {
524     // The pending batch.  If nullptr, this slot is empty.
525     grpc_transport_stream_op_batch* batch;
526     // Indicates whether payload for send ops has been cached in CallData.
527     bool send_ops_cached;
528   };
529
530   CallData(grpc_call_element* elem, const ChannelData& chand,
531            const grpc_call_element_args& args);
532   ~CallData();
533
534   // Caches data for send ops so that it can be retried later, if not
535   // already cached.
536   void MaybeCacheSendOpsForBatch(PendingBatch* pending);
537   void FreeCachedSendInitialMetadata(ChannelData* chand);
538   // Frees cached send_message at index idx.
539   void FreeCachedSendMessage(ChannelData* chand, size_t idx);
540   void FreeCachedSendTrailingMetadata(ChannelData* chand);
541   // Frees cached send ops that have already been completed after
542   // committing the call.
543   void FreeCachedSendOpDataAfterCommit(grpc_call_element* elem,
544                                        SubchannelCallRetryState* retry_state);
545   // Frees cached send ops that were completed by the completed batch in
546   // batch_data.  Used when batches are completed after the call is committed.
547   void FreeCachedSendOpDataForCompletedBatch(
548       grpc_call_element* elem, SubchannelCallBatchData* batch_data,
549       SubchannelCallRetryState* retry_state);
550
551   static void RecvTrailingMetadataReadyForLoadBalancingPolicy(
552       void* arg, grpc_error* error);
553   void MaybeInjectRecvTrailingMetadataReadyForLoadBalancingPolicy(
554       grpc_transport_stream_op_batch* batch);
555
556   // Returns the index into pending_batches_ to be used for batch.
557   static size_t GetBatchIndex(grpc_transport_stream_op_batch* batch);
558   void PendingBatchesAdd(grpc_call_element* elem,
559                          grpc_transport_stream_op_batch* batch);
560   void PendingBatchClear(PendingBatch* pending);
561   void MaybeClearPendingBatch(grpc_call_element* elem, PendingBatch* pending);
562   static void FailPendingBatchInCallCombiner(void* arg, grpc_error* error);
563   // A predicate type and some useful implementations for PendingBatchesFail().
564   typedef bool (*YieldCallCombinerPredicate)(
565       const CallCombinerClosureList& closures);
566   static bool YieldCallCombiner(const CallCombinerClosureList& closures) {
567     return true;
568   }
569   static bool NoYieldCallCombiner(const CallCombinerClosureList& closures) {
570     return false;
571   }
572   static bool YieldCallCombinerIfPendingBatchesFound(
573       const CallCombinerClosureList& closures) {
574     return closures.size() > 0;
575   }
576   // Fails all pending batches.
577   // If yield_call_combiner_predicate returns true, assumes responsibility for
578   // yielding the call combiner.
579   void PendingBatchesFail(
580       grpc_call_element* elem, grpc_error* error,
581       YieldCallCombinerPredicate yield_call_combiner_predicate);
582   static void ResumePendingBatchInCallCombiner(void* arg, grpc_error* ignored);
583   // Resumes all pending batches on subchannel_call_.
584   void PendingBatchesResume(grpc_call_element* elem);
585   // Returns a pointer to the first pending batch for which predicate(batch)
586   // returns true, or null if not found.
587   template <typename Predicate>
588   PendingBatch* PendingBatchFind(grpc_call_element* elem,
589                                  const char* log_message, Predicate predicate);
590
591   // Commits the call so that no further retry attempts will be performed.
592   void RetryCommit(grpc_call_element* elem,
593                    SubchannelCallRetryState* retry_state);
594   // Starts a retry after appropriate back-off.
595   void DoRetry(grpc_call_element* elem, SubchannelCallRetryState* retry_state,
596                grpc_millis server_pushback_ms);
597   // Returns true if the call is being retried.
598   bool MaybeRetry(grpc_call_element* elem, SubchannelCallBatchData* batch_data,
599                   grpc_status_code status, grpc_mdelem* server_pushback_md);
600
601   // Invokes recv_initial_metadata_ready for a subchannel batch.
602   static void InvokeRecvInitialMetadataCallback(void* arg, grpc_error* error);
603   // Intercepts recv_initial_metadata_ready callback for retries.
604   // Commits the call and returns the initial metadata up the stack.
605   static void RecvInitialMetadataReady(void* arg, grpc_error* error);
606
607   // Invokes recv_message_ready for a subchannel batch.
608   static void InvokeRecvMessageCallback(void* arg, grpc_error* error);
609   // Intercepts recv_message_ready callback for retries.
610   // Commits the call and returns the message up the stack.
611   static void RecvMessageReady(void* arg, grpc_error* error);
612
613   // Sets *status and *server_pushback_md based on md_batch and error.
614   // Only sets *server_pushback_md if server_pushback_md != nullptr.
615   void GetCallStatus(grpc_call_element* elem, grpc_metadata_batch* md_batch,
616                      grpc_error* error, grpc_status_code* status,
617                      grpc_mdelem** server_pushback_md);
618   // Adds recv_trailing_metadata_ready closure to closures.
619   void AddClosureForRecvTrailingMetadataReady(
620       grpc_call_element* elem, SubchannelCallBatchData* batch_data,
621       grpc_error* error, CallCombinerClosureList* closures);
622   // Adds any necessary closures for deferred recv_initial_metadata and
623   // recv_message callbacks to closures.
624   static void AddClosuresForDeferredRecvCallbacks(
625       SubchannelCallBatchData* batch_data,
626       SubchannelCallRetryState* retry_state, CallCombinerClosureList* closures);
627   // Returns true if any op in the batch was not yet started.
628   // Only looks at send ops, since recv ops are always started immediately.
629   bool PendingBatchIsUnstarted(PendingBatch* pending,
630                                SubchannelCallRetryState* retry_state);
631   // For any pending batch containing an op that has not yet been started,
632   // adds the pending batch's completion closures to closures.
633   void AddClosuresToFailUnstartedPendingBatches(
634       grpc_call_element* elem, SubchannelCallRetryState* retry_state,
635       grpc_error* error, CallCombinerClosureList* closures);
636   // Runs necessary closures upon completion of a call attempt.
637   void RunClosuresForCompletedCall(SubchannelCallBatchData* batch_data,
638                                    grpc_error* error);
639   // Intercepts recv_trailing_metadata_ready callback for retries.
640   // Commits the call and returns the trailing metadata up the stack.
641   static void RecvTrailingMetadataReady(void* arg, grpc_error* error);
642
643   // Adds the on_complete closure for the pending batch completed in
644   // batch_data to closures.
645   void AddClosuresForCompletedPendingBatch(
646       grpc_call_element* elem, SubchannelCallBatchData* batch_data,
647       SubchannelCallRetryState* retry_state, grpc_error* error,
648       CallCombinerClosureList* closures);
649
650   // If there are any cached ops to replay or pending ops to start on the
651   // subchannel call, adds a closure to closures to invoke
652   // StartRetriableSubchannelBatches().
653   void AddClosuresForReplayOrPendingSendOps(
654       grpc_call_element* elem, SubchannelCallBatchData* batch_data,
655       SubchannelCallRetryState* retry_state, CallCombinerClosureList* closures);
656
657   // Callback used to intercept on_complete from subchannel calls.
658   // Called only when retries are enabled.
659   static void OnComplete(void* arg, grpc_error* error);
660
661   static void StartBatchInCallCombiner(void* arg, grpc_error* ignored);
662   // Adds a closure to closures that will execute batch in the call combiner.
663   void AddClosureForSubchannelBatch(grpc_call_element* elem,
664                                     grpc_transport_stream_op_batch* batch,
665                                     CallCombinerClosureList* closures);
666   // Adds retriable send_initial_metadata op to batch_data.
667   void AddRetriableSendInitialMetadataOp(SubchannelCallRetryState* retry_state,
668                                          SubchannelCallBatchData* batch_data);
669   // Adds retriable send_message op to batch_data.
670   void AddRetriableSendMessageOp(grpc_call_element* elem,
671                                  SubchannelCallRetryState* retry_state,
672                                  SubchannelCallBatchData* batch_data);
673   // Adds retriable send_trailing_metadata op to batch_data.
674   void AddRetriableSendTrailingMetadataOp(SubchannelCallRetryState* retry_state,
675                                           SubchannelCallBatchData* batch_data);
676   // Adds retriable recv_initial_metadata op to batch_data.
677   void AddRetriableRecvInitialMetadataOp(SubchannelCallRetryState* retry_state,
678                                          SubchannelCallBatchData* batch_data);
679   // Adds retriable recv_message op to batch_data.
680   void AddRetriableRecvMessageOp(SubchannelCallRetryState* retry_state,
681                                  SubchannelCallBatchData* batch_data);
682   // Adds retriable recv_trailing_metadata op to batch_data.
683   void AddRetriableRecvTrailingMetadataOp(SubchannelCallRetryState* retry_state,
684                                           SubchannelCallBatchData* batch_data);
685   // Helper function used to start a recv_trailing_metadata batch.  This
686   // is used in the case where a recv_initial_metadata or recv_message
687   // op fails in a way that we know the call is over but when the application
688   // has not yet started its own recv_trailing_metadata op.
689   void StartInternalRecvTrailingMetadata(grpc_call_element* elem);
690   // If there are any cached send ops that need to be replayed on the
691   // current subchannel call, creates and returns a new subchannel batch
692   // to replay those ops.  Otherwise, returns nullptr.
693   SubchannelCallBatchData* MaybeCreateSubchannelBatchForReplay(
694       grpc_call_element* elem, SubchannelCallRetryState* retry_state);
695   // Adds subchannel batches for pending batches to closures.
696   void AddSubchannelBatchesForPendingBatches(
697       grpc_call_element* elem, SubchannelCallRetryState* retry_state,
698       CallCombinerClosureList* closures);
699   // Constructs and starts whatever subchannel batches are needed on the
700   // subchannel call.
701   static void StartRetriableSubchannelBatches(void* arg, grpc_error* ignored);
702
703   void CreateSubchannelCall(grpc_call_element* elem);
704   // Invoked when a pick is completed, on both success or failure.
705   static void PickDone(void* arg, grpc_error* error);
706   // Removes the call from the channel's list of queued picks.
707   void RemoveCallFromQueuedPicksLocked(grpc_call_element* elem);
708   // Adds the call to the channel's list of queued picks.
709   void AddCallToQueuedPicksLocked(grpc_call_element* elem);
710   // Applies service config to the call.  Must be invoked once we know
711   // that the resolver has returned results to the channel.
712   void ApplyServiceConfigToCallLocked(grpc_call_element* elem);
713
714   // State for handling deadlines.
715   // The code in deadline_filter.c requires this to be the first field.
716   // TODO(roth): This is slightly sub-optimal in that grpc_deadline_state
717   // and this struct both independently store pointers to the call stack
718   // and call combiner.  If/when we have time, find a way to avoid this
719   // without breaking the grpc_deadline_state abstraction.
720   grpc_deadline_state deadline_state_;
721
722   grpc_slice path_;  // Request path.
723   gpr_cycle_counter call_start_time_;
724   grpc_millis deadline_;
725   Arena* arena_;
726   grpc_call_stack* owning_call_;
727   CallCombiner* call_combiner_;
728   grpc_call_context_element* call_context_;
729
730   RefCountedPtr<ServerRetryThrottleData> retry_throttle_data_;
731   ServiceConfig::CallData service_config_call_data_;
732   const ClientChannelMethodParsedConfig* method_params_ = nullptr;
733
734   RefCountedPtr<SubchannelCall> subchannel_call_;
735
736   // Set when we get a cancel_stream op.
737   grpc_error* cancel_error_ = GRPC_ERROR_NONE;
738
739   ChannelData::QueuedPick pick_;
740   bool pick_queued_ = false;
741   bool service_config_applied_ = false;
742   QueuedPickCanceller* pick_canceller_ = nullptr;
743   LbCallState lb_call_state_;
744   const LoadBalancingPolicy::BackendMetricData* backend_metric_data_ = nullptr;
745   RefCountedPtr<ConnectedSubchannel> connected_subchannel_;
746   void (*lb_recv_trailing_metadata_ready_)(
747       void* user_data, grpc_error* error,
748       LoadBalancingPolicy::MetadataInterface* recv_trailing_metadata,
749       LoadBalancingPolicy::CallState* call_state) = nullptr;
750   void* lb_recv_trailing_metadata_ready_user_data_ = nullptr;
751   grpc_closure pick_closure_;
752
753   // For intercepting recv_trailing_metadata_ready for the LB policy.
754   grpc_metadata_batch* recv_trailing_metadata_ = nullptr;
755   grpc_closure recv_trailing_metadata_ready_;
756   grpc_closure* original_recv_trailing_metadata_ready_ = nullptr;
757
758   grpc_polling_entity* pollent_ = nullptr;
759
760   // Batches are added to this list when received from above.
761   // They are removed when we are done handling the batch (i.e., when
762   // either we have invoked all of the batch's callbacks or we have
763   // passed the batch down to the subchannel call and are not
764   // intercepting any of its callbacks).
765   PendingBatch pending_batches_[MAX_PENDING_BATCHES] = {};
766   bool pending_send_initial_metadata_ : 1;
767   bool pending_send_message_ : 1;
768   bool pending_send_trailing_metadata_ : 1;
769
770   // Retry state.
771   bool enable_retries_ : 1;
772   bool retry_committed_ : 1;
773   bool last_attempt_got_server_pushback_ : 1;
774   int num_attempts_completed_ = 0;
775   size_t bytes_buffered_for_retry_ = 0;
776   // TODO(roth): Restructure this to eliminate use of ManualConstructor.
777   ManualConstructor<BackOff> retry_backoff_;
778   grpc_timer retry_timer_;
779
780   // The number of pending retriable subchannel batches containing send ops.
781   // We hold a ref to the call stack while this is non-zero, since replay
782   // batches may not complete until after all callbacks have been returned
783   // to the surface, and we need to make sure that the call is not destroyed
784   // until all of these batches have completed.
785   // Note that we actually only need to track replay batches, but it's
786   // easier to track all batches with send ops.
787   int num_pending_retriable_subchannel_send_batches_ = 0;
788
789   // Cached data for retrying send ops.
790   // send_initial_metadata
791   bool seen_send_initial_metadata_ = false;
792   grpc_linked_mdelem* send_initial_metadata_storage_ = nullptr;
793   grpc_metadata_batch send_initial_metadata_;
794   uint32_t send_initial_metadata_flags_;
795   gpr_atm* peer_string_;
796   // send_message
797   // When we get a send_message op, we replace the original byte stream
798   // with a CachingByteStream that caches the slices to a local buffer for
799   // use in retries.
800   // Note: We inline the cache for the first 3 send_message ops and use
801   // dynamic allocation after that.  This number was essentially picked
802   // at random; it could be changed in the future to tune performance.
803   InlinedVector<ByteStreamCache*, 3> send_messages_;
804   // send_trailing_metadata
805   bool seen_send_trailing_metadata_ = false;
806   grpc_linked_mdelem* send_trailing_metadata_storage_ = nullptr;
807   grpc_metadata_batch send_trailing_metadata_;
808 };
809
810 //
811 // ChannelData::SubchannelWrapper
812 //
813
814 // This class is a wrapper for Subchannel that hides details of the
815 // channel's implementation (such as the health check service name and
816 // connected subchannel) from the LB policy API.
817 //
818 // Note that no synchronization is needed here, because even if the
819 // underlying subchannel is shared between channels, this wrapper will only
820 // be used within one channel, so it will always be synchronized by the
821 // control plane combiner.
822 class ChannelData::SubchannelWrapper : public SubchannelInterface {
823  public:
824   SubchannelWrapper(ChannelData* chand, Subchannel* subchannel,
825                     UniquePtr<char> health_check_service_name)
826       : SubchannelInterface(&grpc_client_channel_routing_trace),
827         chand_(chand),
828         subchannel_(subchannel),
829         health_check_service_name_(std::move(health_check_service_name)) {
830     if (GRPC_TRACE_FLAG_ENABLED(grpc_client_channel_routing_trace)) {
831       gpr_log(GPR_INFO,
832               "chand=%p: creating subchannel wrapper %p for subchannel %p",
833               chand, this, subchannel_);
834     }
835     GRPC_CHANNEL_STACK_REF(chand_->owning_stack_, "SubchannelWrapper");
836     auto* subchannel_node = subchannel_->channelz_node();
837     if (subchannel_node != nullptr) {
838       auto it = chand_->subchannel_refcount_map_.find(subchannel_);
839       if (it == chand_->subchannel_refcount_map_.end()) {
840         chand_->channelz_node_->AddChildSubchannel(subchannel_node->uuid());
841         it = chand_->subchannel_refcount_map_.emplace(subchannel_, 0).first;
842       }
843       ++it->second;
844     }
845     chand_->subchannel_wrappers_[this] = true;
846   }
847
848   ~SubchannelWrapper() {
849     if (GRPC_TRACE_FLAG_ENABLED(grpc_client_channel_routing_trace)) {
850       gpr_log(GPR_INFO,
851               "chand=%p: destroying subchannel wrapper %p for subchannel %p",
852               chand_, this, subchannel_);
853     }
854     chand_->subchannel_wrappers_.erase(this);
855     auto* subchannel_node = subchannel_->channelz_node();
856     if (subchannel_node != nullptr) {
857       auto it = chand_->subchannel_refcount_map_.find(subchannel_);
858       GPR_ASSERT(it != chand_->subchannel_refcount_map_.end());
859       --it->second;
860       if (it->second == 0) {
861         chand_->channelz_node_->RemoveChildSubchannel(subchannel_node->uuid());
862         chand_->subchannel_refcount_map_.erase(it);
863       }
864     }
865     GRPC_SUBCHANNEL_UNREF(subchannel_, "unref from LB");
866     GRPC_CHANNEL_STACK_UNREF(chand_->owning_stack_, "SubchannelWrapper");
867   }
868
869   grpc_connectivity_state CheckConnectivityState() override {
870     RefCountedPtr<ConnectedSubchannel> connected_subchannel;
871     grpc_connectivity_state connectivity_state =
872         subchannel_->CheckConnectivityState(health_check_service_name_.get(),
873                                             &connected_subchannel);
874     MaybeUpdateConnectedSubchannel(std::move(connected_subchannel));
875     return connectivity_state;
876   }
877
878   void WatchConnectivityState(
879       grpc_connectivity_state initial_state,
880       UniquePtr<ConnectivityStateWatcherInterface> watcher) override {
881     auto& watcher_wrapper = watcher_map_[watcher.get()];
882     GPR_ASSERT(watcher_wrapper == nullptr);
883     watcher_wrapper = New<WatcherWrapper>(std::move(watcher),
884                                           Ref(DEBUG_LOCATION, "WatcherWrapper"),
885                                           initial_state);
886     subchannel_->WatchConnectivityState(
887         initial_state,
888         UniquePtr<char>(gpr_strdup(health_check_service_name_.get())),
889         OrphanablePtr<Subchannel::ConnectivityStateWatcherInterface>(
890             watcher_wrapper));
891   }
892
893   void CancelConnectivityStateWatch(
894       ConnectivityStateWatcherInterface* watcher) override {
895     auto it = watcher_map_.find(watcher);
896     GPR_ASSERT(it != watcher_map_.end());
897     subchannel_->CancelConnectivityStateWatch(health_check_service_name_.get(),
898                                               it->second);
899     watcher_map_.erase(it);
900   }
901
902   void AttemptToConnect() override { subchannel_->AttemptToConnect(); }
903
904   void ResetBackoff() override { subchannel_->ResetBackoff(); }
905
906   const grpc_channel_args* channel_args() override {
907     return subchannel_->channel_args();
908   }
909
910   void UpdateHealthCheckServiceName(UniquePtr<char> health_check_service_name) {
911     if (GRPC_TRACE_FLAG_ENABLED(grpc_client_channel_routing_trace)) {
912       gpr_log(GPR_INFO,
913               "chand=%p: subchannel wrapper %p: updating health check service "
914               "name from \"%s\" to \"%s\"",
915               chand_, this, health_check_service_name_.get(),
916               health_check_service_name.get());
917     }
918     for (auto& p : watcher_map_) {
919       WatcherWrapper*& watcher_wrapper = p.second;
920       // Cancel the current watcher and create a new one using the new
921       // health check service name.
922       // TODO(roth): If there is not already an existing health watch
923       // call for the new name, then the watcher will initially report
924       // state CONNECTING.  If the LB policy is currently reporting
925       // state READY, this may cause it to switch to CONNECTING before
926       // switching back to READY.  This could cause a small delay for
927       // RPCs being started on the channel.  If/when this becomes a
928       // problem, we may be able to handle it by waiting for the new
929       // watcher to report READY before we use it to replace the old one.
930       WatcherWrapper* replacement = watcher_wrapper->MakeReplacement();
931       subchannel_->CancelConnectivityStateWatch(
932           health_check_service_name_.get(), watcher_wrapper);
933       watcher_wrapper = replacement;
934       subchannel_->WatchConnectivityState(
935           replacement->last_seen_state(),
936           UniquePtr<char>(gpr_strdup(health_check_service_name.get())),
937           OrphanablePtr<Subchannel::ConnectivityStateWatcherInterface>(
938               replacement));
939     }
940     // Save the new health check service name.
941     health_check_service_name_ = std::move(health_check_service_name);
942   }
943
944   // Caller must be holding the control-plane combiner.
945   ConnectedSubchannel* connected_subchannel() const {
946     return connected_subchannel_.get();
947   }
948
949   // Caller must be holding the data-plane mutex.
950   ConnectedSubchannel* connected_subchannel_in_data_plane() const {
951     return connected_subchannel_in_data_plane_.get();
952   }
953   void set_connected_subchannel_in_data_plane(
954       RefCountedPtr<ConnectedSubchannel> connected_subchannel) {
955     connected_subchannel_in_data_plane_ = std::move(connected_subchannel);
956   }
957
958  private:
959   // Subchannel and SubchannelInterface have different interfaces for
960   // their respective ConnectivityStateWatcherInterface classes.
961   // The one in Subchannel updates the ConnectedSubchannel along with
962   // the state, whereas the one in SubchannelInterface does not expose
963   // the ConnectedSubchannel.
964   //
965   // This wrapper provides a bridge between the two.  It implements
966   // Subchannel::ConnectivityStateWatcherInterface and wraps
967   // the instance of SubchannelInterface::ConnectivityStateWatcherInterface
968   // that was passed in by the LB policy.  We pass an instance of this
969   // class to the underlying Subchannel, and when we get updates from
970   // the subchannel, we pass those on to the wrapped watcher to return
971   // the update to the LB policy.  This allows us to set the connected
972   // subchannel before passing the result back to the LB policy.
973   class WatcherWrapper : public Subchannel::ConnectivityStateWatcherInterface {
974    public:
975     WatcherWrapper(
976         UniquePtr<SubchannelInterface::ConnectivityStateWatcherInterface>
977             watcher,
978         RefCountedPtr<SubchannelWrapper> parent,
979         grpc_connectivity_state initial_state)
980         : watcher_(std::move(watcher)),
981           parent_(std::move(parent)),
982           last_seen_state_(initial_state) {}
983
984     ~WatcherWrapper() { parent_.reset(DEBUG_LOCATION, "WatcherWrapper"); }
985
986     void Orphan() override { Unref(); }
987
988     void OnConnectivityStateChange(
989         grpc_connectivity_state new_state,
990         RefCountedPtr<ConnectedSubchannel> connected_subchannel) override {
991       if (GRPC_TRACE_FLAG_ENABLED(grpc_client_channel_routing_trace)) {
992         gpr_log(GPR_INFO,
993                 "chand=%p: connectivity change for subchannel wrapper %p "
994                 "subchannel %p (connected_subchannel=%p state=%s); "
995                 "hopping into combiner",
996                 parent_->chand_, parent_.get(), parent_->subchannel_,
997                 connected_subchannel.get(),
998                 grpc_connectivity_state_name(new_state));
999       }
1000       // Will delete itself.
1001       New<Updater>(Ref(), new_state, std::move(connected_subchannel));
1002     }
1003
1004     grpc_pollset_set* interested_parties() override {
1005       SubchannelInterface::ConnectivityStateWatcherInterface* watcher =
1006           watcher_.get();
1007       if (watcher_ == nullptr) watcher = replacement_->watcher_.get();
1008       return watcher->interested_parties();
1009     }
1010
1011     WatcherWrapper* MakeReplacement() {
1012       auto* replacement =
1013           New<WatcherWrapper>(std::move(watcher_), parent_, last_seen_state_);
1014       replacement_ = replacement;
1015       return replacement;
1016     }
1017
1018     grpc_connectivity_state last_seen_state() const { return last_seen_state_; }
1019
1020    private:
1021     class Updater {
1022      public:
1023       Updater(RefCountedPtr<WatcherWrapper> parent,
1024               grpc_connectivity_state new_state,
1025               RefCountedPtr<ConnectedSubchannel> connected_subchannel)
1026           : parent_(std::move(parent)),
1027             state_(new_state),
1028             connected_subchannel_(std::move(connected_subchannel)) {
1029         GRPC_CLOSURE_INIT(
1030             &closure_, ApplyUpdateInControlPlaneCombiner, this,
1031             grpc_combiner_scheduler(parent_->parent_->chand_->combiner_));
1032         GRPC_CLOSURE_SCHED(&closure_, GRPC_ERROR_NONE);
1033       }
1034
1035      private:
1036       static void ApplyUpdateInControlPlaneCombiner(void* arg,
1037                                                     grpc_error* error) {
1038         Updater* self = static_cast<Updater*>(arg);
1039         if (GRPC_TRACE_FLAG_ENABLED(grpc_client_channel_routing_trace)) {
1040           gpr_log(GPR_INFO,
1041                   "chand=%p: processing connectivity change in combiner "
1042                   "for subchannel wrapper %p subchannel %p "
1043                   "(connected_subchannel=%p state=%s): watcher=%p",
1044                   self->parent_->parent_->chand_, self->parent_->parent_.get(),
1045                   self->parent_->parent_->subchannel_,
1046                   self->connected_subchannel_.get(),
1047                   grpc_connectivity_state_name(self->state_),
1048                   self->parent_->watcher_.get());
1049         }
1050         // Ignore update if the parent WatcherWrapper has been replaced
1051         // since this callback was scheduled.
1052         if (self->parent_->watcher_ == nullptr) return;
1053         self->parent_->last_seen_state_ = self->state_;
1054         self->parent_->parent_->MaybeUpdateConnectedSubchannel(
1055             std::move(self->connected_subchannel_));
1056         self->parent_->watcher_->OnConnectivityStateChange(self->state_);
1057         Delete(self);
1058       }
1059
1060       RefCountedPtr<WatcherWrapper> parent_;
1061       grpc_connectivity_state state_;
1062       RefCountedPtr<ConnectedSubchannel> connected_subchannel_;
1063       grpc_closure closure_;
1064     };
1065
1066     UniquePtr<SubchannelInterface::ConnectivityStateWatcherInterface> watcher_;
1067     RefCountedPtr<SubchannelWrapper> parent_;
1068     grpc_connectivity_state last_seen_state_;
1069     WatcherWrapper* replacement_ = nullptr;
1070   };
1071
1072   void MaybeUpdateConnectedSubchannel(
1073       RefCountedPtr<ConnectedSubchannel> connected_subchannel) {
1074     // Update the connected subchannel only if the channel is not shutting
1075     // down.  This is because once the channel is shutting down, we
1076     // ignore picker updates from the LB policy, which means that
1077     // UpdateStateAndPickerLocked() will never process the entries
1078     // in chand_->pending_subchannel_updates_.  So we don't want to add
1079     // entries there that will never be processed, since that would
1080     // leave dangling refs to the channel and prevent its destruction.
1081     grpc_error* disconnect_error = chand_->disconnect_error();
1082     if (disconnect_error != GRPC_ERROR_NONE) return;
1083     // Not shutting down, so do the update.
1084     if (connected_subchannel_ != connected_subchannel) {
1085       connected_subchannel_ = std::move(connected_subchannel);
1086       // Record the new connected subchannel so that it can be updated
1087       // in the data plane mutex the next time the picker is updated.
1088       chand_->pending_subchannel_updates_[Ref(
1089           DEBUG_LOCATION, "ConnectedSubchannelUpdate")] = connected_subchannel_;
1090     }
1091   }
1092
1093   ChannelData* chand_;
1094   Subchannel* subchannel_;
1095   UniquePtr<char> health_check_service_name_;
1096   // Maps from the address of the watcher passed to us by the LB policy
1097   // to the address of the WrapperWatcher that we passed to the underlying
1098   // subchannel.  This is needed so that when the LB policy calls
1099   // CancelConnectivityStateWatch() with its watcher, we know the
1100   // corresponding WrapperWatcher to cancel on the underlying subchannel.
1101   Map<ConnectivityStateWatcherInterface*, WatcherWrapper*> watcher_map_;
1102   // To be accessed only in the control plane combiner.
1103   RefCountedPtr<ConnectedSubchannel> connected_subchannel_;
1104   // To be accessed only in the data plane mutex.
1105   RefCountedPtr<ConnectedSubchannel> connected_subchannel_in_data_plane_;
1106 };
1107
1108 //
1109 // ChannelData::ExternalConnectivityWatcher::WatcherList
1110 //
1111
1112 int ChannelData::ExternalConnectivityWatcher::WatcherList::size() const {
1113   MutexLock lock(&mu_);
1114   int count = 0;
1115   for (ExternalConnectivityWatcher* w = head_; w != nullptr; w = w->next_) {
1116     ++count;
1117   }
1118   return count;
1119 }
1120
1121 ChannelData::ExternalConnectivityWatcher*
1122 ChannelData::ExternalConnectivityWatcher::WatcherList::Lookup(
1123     grpc_closure* on_complete) const {
1124   MutexLock lock(&mu_);
1125   ExternalConnectivityWatcher* w = head_;
1126   while (w != nullptr && w->on_complete_ != on_complete) {
1127     w = w->next_;
1128   }
1129   return w;
1130 }
1131
1132 void ChannelData::ExternalConnectivityWatcher::WatcherList::Add(
1133     ExternalConnectivityWatcher* watcher) {
1134   GPR_ASSERT(Lookup(watcher->on_complete_) == nullptr);
1135   MutexLock lock(&mu_);
1136   GPR_ASSERT(watcher->next_ == nullptr);
1137   watcher->next_ = head_;
1138   head_ = watcher;
1139 }
1140
1141 void ChannelData::ExternalConnectivityWatcher::WatcherList::Remove(
1142     const ExternalConnectivityWatcher* watcher) {
1143   MutexLock lock(&mu_);
1144   if (watcher == head_) {
1145     head_ = watcher->next_;
1146     return;
1147   }
1148   for (ExternalConnectivityWatcher* w = head_; w != nullptr; w = w->next_) {
1149     if (w->next_ == watcher) {
1150       w->next_ = w->next_->next_;
1151       return;
1152     }
1153   }
1154   GPR_UNREACHABLE_CODE(return );
1155 }
1156
1157 //
1158 // ChannelData::ExternalConnectivityWatcher
1159 //
1160
1161 ChannelData::ExternalConnectivityWatcher::ExternalConnectivityWatcher(
1162     ChannelData* chand, grpc_polling_entity pollent,
1163     grpc_connectivity_state* state, grpc_closure* on_complete,
1164     grpc_closure* watcher_timer_init)
1165     : chand_(chand),
1166       pollent_(pollent),
1167       state_(state),
1168       on_complete_(on_complete),
1169       watcher_timer_init_(watcher_timer_init) {
1170   grpc_polling_entity_add_to_pollset_set(&pollent_,
1171                                          chand_->interested_parties_);
1172   GRPC_CHANNEL_STACK_REF(chand_->owning_stack_, "ExternalConnectivityWatcher");
1173   GRPC_CLOSURE_SCHED(
1174       GRPC_CLOSURE_INIT(&my_closure_, WatchConnectivityStateLocked, this,
1175                         grpc_combiner_scheduler(chand_->combiner_)),
1176       GRPC_ERROR_NONE);
1177 }
1178
1179 ChannelData::ExternalConnectivityWatcher::~ExternalConnectivityWatcher() {
1180   grpc_polling_entity_del_from_pollset_set(&pollent_,
1181                                            chand_->interested_parties_);
1182   GRPC_CHANNEL_STACK_UNREF(chand_->owning_stack_,
1183                            "ExternalConnectivityWatcher");
1184 }
1185
1186 void ChannelData::ExternalConnectivityWatcher::OnWatchCompleteLocked(
1187     void* arg, grpc_error* error) {
1188   ExternalConnectivityWatcher* self =
1189       static_cast<ExternalConnectivityWatcher*>(arg);
1190   grpc_closure* on_complete = self->on_complete_;
1191   self->chand_->external_connectivity_watcher_list_.Remove(self);
1192   Delete(self);
1193   GRPC_CLOSURE_SCHED(on_complete, GRPC_ERROR_REF(error));
1194 }
1195
1196 void ChannelData::ExternalConnectivityWatcher::WatchConnectivityStateLocked(
1197     void* arg, grpc_error* ignored) {
1198   ExternalConnectivityWatcher* self =
1199       static_cast<ExternalConnectivityWatcher*>(arg);
1200   if (self->state_ == nullptr) {
1201     // Handle cancellation.
1202     GPR_ASSERT(self->watcher_timer_init_ == nullptr);
1203     ExternalConnectivityWatcher* found =
1204         self->chand_->external_connectivity_watcher_list_.Lookup(
1205             self->on_complete_);
1206     if (found != nullptr) {
1207       grpc_connectivity_state_notify_on_state_change(
1208           &found->chand_->state_tracker_, nullptr, &found->my_closure_);
1209     }
1210     Delete(self);
1211     return;
1212   }
1213   // New watcher.
1214   self->chand_->external_connectivity_watcher_list_.Add(self);
1215   // This assumes that the closure is scheduled on the ExecCtx scheduler
1216   // and that GRPC_CLOSURE_RUN would run the closure immediately.
1217   GRPC_CLOSURE_RUN(self->watcher_timer_init_, GRPC_ERROR_NONE);
1218   GRPC_CLOSURE_INIT(&self->my_closure_, OnWatchCompleteLocked, self,
1219                     grpc_combiner_scheduler(self->chand_->combiner_));
1220   grpc_connectivity_state_notify_on_state_change(
1221       &self->chand_->state_tracker_, self->state_, &self->my_closure_);
1222 }
1223
1224 //
1225 // ChannelData::ClientChannelControlHelper
1226 //
1227
1228 class ChannelData::ClientChannelControlHelper
1229     : public LoadBalancingPolicy::ChannelControlHelper {
1230  public:
1231   explicit ClientChannelControlHelper(ChannelData* chand) : chand_(chand) {
1232     GRPC_CHANNEL_STACK_REF(chand_->owning_stack_, "ClientChannelControlHelper");
1233   }
1234
1235   ~ClientChannelControlHelper() override {
1236     GRPC_CHANNEL_STACK_UNREF(chand_->owning_stack_,
1237                              "ClientChannelControlHelper");
1238   }
1239
1240   RefCountedPtr<SubchannelInterface> CreateSubchannel(
1241       const grpc_channel_args& args) override {
1242     bool inhibit_health_checking = grpc_channel_arg_get_bool(
1243         grpc_channel_args_find(&args, GRPC_ARG_INHIBIT_HEALTH_CHECKING), false);
1244     UniquePtr<char> health_check_service_name;
1245     if (!inhibit_health_checking) {
1246       health_check_service_name.reset(
1247           gpr_strdup(chand_->health_check_service_name_.get()));
1248     }
1249     static const char* args_to_remove[] = {
1250         GRPC_ARG_INHIBIT_HEALTH_CHECKING,
1251         GRPC_ARG_CHANNELZ_CHANNEL_NODE,
1252     };
1253     grpc_arg arg = SubchannelPoolInterface::CreateChannelArg(
1254         chand_->subchannel_pool_.get());
1255     grpc_channel_args* new_args = grpc_channel_args_copy_and_add_and_remove(
1256         &args, args_to_remove, GPR_ARRAY_SIZE(args_to_remove), &arg, 1);
1257     Subchannel* subchannel =
1258         chand_->client_channel_factory_->CreateSubchannel(new_args);
1259     grpc_channel_args_destroy(new_args);
1260     if (subchannel == nullptr) return nullptr;
1261     return MakeRefCounted<SubchannelWrapper>(
1262         chand_, subchannel, std::move(health_check_service_name));
1263   }
1264
1265   void UpdateState(
1266       grpc_connectivity_state state,
1267       UniquePtr<LoadBalancingPolicy::SubchannelPicker> picker) override {
1268     grpc_error* disconnect_error = chand_->disconnect_error();
1269     if (GRPC_TRACE_FLAG_ENABLED(grpc_client_channel_routing_trace)) {
1270       const char* extra = disconnect_error == GRPC_ERROR_NONE
1271                               ? ""
1272                               : " (ignoring -- channel shutting down)";
1273       gpr_log(GPR_INFO, "chand=%p: update: state=%s picker=%p%s", chand_,
1274               grpc_connectivity_state_name(state), picker.get(), extra);
1275     }
1276     // Do update only if not shutting down.
1277     if (disconnect_error == GRPC_ERROR_NONE) {
1278       chand_->UpdateStateAndPickerLocked(state, "helper", std::move(picker));
1279     }
1280   }
1281
1282   // No-op -- we should never get this from ResolvingLoadBalancingPolicy.
1283   void RequestReresolution() override {}
1284
1285   void AddTraceEvent(TraceSeverity severity, StringView message) override {
1286     if (chand_->channelz_node_ != nullptr) {
1287       chand_->channelz_node_->AddTraceEvent(
1288           ConvertSeverityEnum(severity),
1289           grpc_slice_from_copied_buffer(message.data(), message.size()));
1290     }
1291   }
1292
1293  private:
1294   static channelz::ChannelTrace::Severity ConvertSeverityEnum(
1295       TraceSeverity severity) {
1296     if (severity == TRACE_INFO) return channelz::ChannelTrace::Info;
1297     if (severity == TRACE_WARNING) return channelz::ChannelTrace::Warning;
1298     return channelz::ChannelTrace::Error;
1299   }
1300
1301   ChannelData* chand_;
1302 };
1303
1304 //
1305 // ChannelData implementation
1306 //
1307
1308 grpc_error* ChannelData::Init(grpc_channel_element* elem,
1309                               grpc_channel_element_args* args) {
1310   GPR_ASSERT(args->is_last);
1311   GPR_ASSERT(elem->filter == &grpc_client_channel_filter);
1312   grpc_error* error = GRPC_ERROR_NONE;
1313   new (elem->channel_data) ChannelData(args, &error);
1314   return error;
1315 }
1316
1317 void ChannelData::Destroy(grpc_channel_element* elem) {
1318   ChannelData* chand = static_cast<ChannelData*>(elem->channel_data);
1319   chand->~ChannelData();
1320 }
1321
1322 bool GetEnableRetries(const grpc_channel_args* args) {
1323   return grpc_channel_arg_get_bool(
1324       grpc_channel_args_find(args, GRPC_ARG_ENABLE_RETRIES), true);
1325 }
1326
1327 size_t GetMaxPerRpcRetryBufferSize(const grpc_channel_args* args) {
1328   return static_cast<size_t>(grpc_channel_arg_get_integer(
1329       grpc_channel_args_find(args, GRPC_ARG_PER_RPC_RETRY_BUFFER_SIZE),
1330       {DEFAULT_PER_RPC_RETRY_BUFFER_SIZE, 0, INT_MAX}));
1331 }
1332
1333 RefCountedPtr<SubchannelPoolInterface> GetSubchannelPool(
1334     const grpc_channel_args* args) {
1335   const bool use_local_subchannel_pool = grpc_channel_arg_get_bool(
1336       grpc_channel_args_find(args, GRPC_ARG_USE_LOCAL_SUBCHANNEL_POOL), false);
1337   if (use_local_subchannel_pool) {
1338     return MakeRefCounted<LocalSubchannelPool>();
1339   }
1340   return GlobalSubchannelPool::instance();
1341 }
1342
1343 channelz::ChannelNode* GetChannelzNode(const grpc_channel_args* args) {
1344   const grpc_arg* arg =
1345       grpc_channel_args_find(args, GRPC_ARG_CHANNELZ_CHANNEL_NODE);
1346   if (arg != nullptr && arg->type == GRPC_ARG_POINTER) {
1347     return static_cast<channelz::ChannelNode*>(arg->value.pointer.p);
1348   }
1349   return nullptr;
1350 }
1351
1352 ChannelData::ChannelData(grpc_channel_element_args* args, grpc_error** error)
1353     : deadline_checking_enabled_(
1354           grpc_deadline_checking_enabled(args->channel_args)),
1355       enable_retries_(GetEnableRetries(args->channel_args)),
1356       per_rpc_retry_buffer_size_(
1357           GetMaxPerRpcRetryBufferSize(args->channel_args)),
1358       owning_stack_(args->channel_stack),
1359       client_channel_factory_(
1360           ClientChannelFactory::GetFromChannelArgs(args->channel_args)),
1361       channelz_node_(GetChannelzNode(args->channel_args)),
1362       combiner_(grpc_combiner_create()),
1363       interested_parties_(grpc_pollset_set_create()),
1364       subchannel_pool_(GetSubchannelPool(args->channel_args)),
1365       disconnect_error_(GRPC_ERROR_NONE) {
1366   if (GRPC_TRACE_FLAG_ENABLED(grpc_client_channel_routing_trace)) {
1367     gpr_log(GPR_INFO, "chand=%p: creating client_channel for channel stack %p",
1368             this, owning_stack_);
1369   }
1370   // Initialize data members.
1371   grpc_connectivity_state_init(&state_tracker_, GRPC_CHANNEL_IDLE,
1372                                "client_channel");
1373   gpr_mu_init(&info_mu_);
1374   // Start backup polling.
1375   grpc_client_channel_start_backup_polling(interested_parties_);
1376   // Check client channel factory.
1377   if (client_channel_factory_ == nullptr) {
1378     *error = GRPC_ERROR_CREATE_FROM_STATIC_STRING(
1379         "Missing client channel factory in args for client channel filter");
1380     return;
1381   }
1382   // Get server name to resolve, using proxy mapper if needed.
1383   const char* server_uri = grpc_channel_arg_get_string(
1384       grpc_channel_args_find(args->channel_args, GRPC_ARG_SERVER_URI));
1385   if (server_uri == nullptr) {
1386     *error = GRPC_ERROR_CREATE_FROM_STATIC_STRING(
1387         "server URI channel arg missing or wrong type in client channel "
1388         "filter");
1389     return;
1390   }
1391   // Get default service config
1392   const char* service_config_json = grpc_channel_arg_get_string(
1393       grpc_channel_args_find(args->channel_args, GRPC_ARG_SERVICE_CONFIG));
1394   if (service_config_json != nullptr) {
1395     *error = GRPC_ERROR_NONE;
1396     default_service_config_ = ServiceConfig::Create(service_config_json, error);
1397     if (*error != GRPC_ERROR_NONE) {
1398       default_service_config_.reset();
1399       return;
1400     }
1401   }
1402   grpc_uri* uri = grpc_uri_parse(server_uri, true);
1403   if (uri != nullptr && uri->path[0] != '\0') {
1404     server_name_.reset(
1405         gpr_strdup(uri->path[0] == '/' ? uri->path + 1 : uri->path));
1406   }
1407   grpc_uri_destroy(uri);
1408   char* proxy_name = nullptr;
1409   grpc_channel_args* new_args = nullptr;
1410   grpc_proxy_mappers_map_name(server_uri, args->channel_args, &proxy_name,
1411                               &new_args);
1412   target_uri_.reset(proxy_name != nullptr ? proxy_name
1413                                           : gpr_strdup(server_uri));
1414   channel_args_ = new_args != nullptr
1415                       ? new_args
1416                       : grpc_channel_args_copy(args->channel_args);
1417   if (!ResolverRegistry::IsValidTarget(target_uri_.get())) {
1418     *error =
1419         GRPC_ERROR_CREATE_FROM_STATIC_STRING("the target uri is not valid.");
1420     return;
1421   }
1422   *error = GRPC_ERROR_NONE;
1423 }
1424
1425 ChannelData::~ChannelData() {
1426   if (GRPC_TRACE_FLAG_ENABLED(grpc_client_channel_routing_trace)) {
1427     gpr_log(GPR_INFO, "chand=%p: destroying channel", this);
1428   }
1429   DestroyResolvingLoadBalancingPolicyLocked();
1430   grpc_channel_args_destroy(channel_args_);
1431   // Stop backup polling.
1432   grpc_client_channel_stop_backup_polling(interested_parties_);
1433   grpc_pollset_set_destroy(interested_parties_);
1434   GRPC_COMBINER_UNREF(combiner_, "client_channel");
1435   GRPC_ERROR_UNREF(disconnect_error_.Load(MemoryOrder::RELAXED));
1436   grpc_connectivity_state_destroy(&state_tracker_);
1437   gpr_mu_destroy(&info_mu_);
1438 }
1439
1440 void ChannelData::UpdateStateAndPickerLocked(
1441     grpc_connectivity_state state, const char* reason,
1442     UniquePtr<LoadBalancingPolicy::SubchannelPicker> picker) {
1443   // Clean the control plane when entering IDLE.
1444   if (picker_ == nullptr) {
1445     health_check_service_name_.reset();
1446     saved_service_config_.reset();
1447     received_first_resolver_result_ = false;
1448   }
1449   // Update connectivity state.
1450   grpc_connectivity_state_set(&state_tracker_, state, reason);
1451   if (channelz_node_ != nullptr) {
1452     channelz_node_->SetConnectivityState(state);
1453     channelz_node_->AddTraceEvent(
1454         channelz::ChannelTrace::Severity::Info,
1455         grpc_slice_from_static_string(
1456             channelz::ChannelNode::GetChannelConnectivityStateChangeString(
1457                 state)));
1458   }
1459   // Grab data plane lock to do subchannel updates and update the picker.
1460   //
1461   // Note that we want to minimize the work done while holding the data
1462   // plane lock, to keep the critical section small.  So, for all of the
1463   // objects that we might wind up unreffing here, we actually hold onto
1464   // the refs until after we release the lock, and then unref them at
1465   // that point.  This includes the following:
1466   // - refs to subchannel wrappers in the keys of pending_subchannel_updates_
1467   // - ref stored in retry_throttle_data_
1468   // - ref stored in service_config_
1469   // - ownership of the existing picker in picker_
1470   RefCountedPtr<ServerRetryThrottleData> retry_throttle_data_to_unref;
1471   RefCountedPtr<ServiceConfig> service_config_to_unref;
1472   {
1473     MutexLock lock(&data_plane_mu_);
1474     // Handle subchannel updates.
1475     for (auto& p : pending_subchannel_updates_) {
1476       if (GRPC_TRACE_FLAG_ENABLED(grpc_client_channel_routing_trace)) {
1477         gpr_log(GPR_INFO,
1478                 "chand=%p: updating subchannel wrapper %p data plane "
1479                 "connected_subchannel to %p",
1480                 this, p.first.get(), p.second.get());
1481       }
1482       // Note: We do not remove the entry from pending_subchannel_updates_
1483       // here, since this would unref the subchannel wrapper; instead,
1484       // we wait until we've released the lock to clear the map.
1485       p.first->set_connected_subchannel_in_data_plane(std::move(p.second));
1486     }
1487     // Swap out the picker.
1488     // Note: Original value will be destroyed after the lock is released.
1489     picker_.swap(picker);
1490     // Clean the data plane if the updated picker is nullptr.
1491     if (picker_ == nullptr) {
1492       received_service_config_data_ = false;
1493       // Note: We save the objects to unref until after the lock is released.
1494       retry_throttle_data_to_unref = std::move(retry_throttle_data_);
1495       service_config_to_unref = std::move(service_config_);
1496     }
1497     // Re-process queued picks.
1498     for (QueuedPick* pick = queued_picks_; pick != nullptr; pick = pick->next) {
1499       grpc_call_element* elem = pick->elem;
1500       CallData* calld = static_cast<CallData*>(elem->call_data);
1501       grpc_error* error = GRPC_ERROR_NONE;
1502       if (calld->PickSubchannelLocked(elem, &error)) {
1503         calld->AsyncPickDone(elem, error);
1504       }
1505     }
1506   }
1507   // Clear the pending update map after releasing the lock, to keep the
1508   // critical section small.
1509   pending_subchannel_updates_.clear();
1510 }
1511
1512 void ChannelData::UpdateServiceConfigLocked(
1513     RefCountedPtr<ServerRetryThrottleData> retry_throttle_data,
1514     RefCountedPtr<ServiceConfig> service_config) {
1515   // Grab data plane lock to update service config.
1516   //
1517   // We defer unreffing the old values (and deallocating memory) until
1518   // after releasing the lock to keep the critical section small.
1519   {
1520     MutexLock lock(&data_plane_mu_);
1521     // Update service config.
1522     received_service_config_data_ = true;
1523     // Old values will be unreffed after lock is released.
1524     retry_throttle_data_.swap(retry_throttle_data);
1525     service_config_.swap(service_config);
1526     // Apply service config to queued picks.
1527     for (QueuedPick* pick = queued_picks_; pick != nullptr; pick = pick->next) {
1528       CallData* calld = static_cast<CallData*>(pick->elem->call_data);
1529       calld->MaybeApplyServiceConfigToCallLocked(pick->elem);
1530     }
1531   }
1532   // Old values will be unreffed after lock is released when they go out
1533   // of scope.
1534 }
1535
1536 void ChannelData::CreateResolvingLoadBalancingPolicyLocked() {
1537   // Instantiate resolving LB policy.
1538   LoadBalancingPolicy::Args lb_args;
1539   lb_args.combiner = combiner_;
1540   lb_args.channel_control_helper =
1541       UniquePtr<LoadBalancingPolicy::ChannelControlHelper>(
1542           New<ClientChannelControlHelper>(this));
1543   lb_args.args = channel_args_;
1544   UniquePtr<char> target_uri(gpr_strdup(target_uri_.get()));
1545   resolving_lb_policy_.reset(New<ResolvingLoadBalancingPolicy>(
1546       std::move(lb_args), &grpc_client_channel_routing_trace,
1547       std::move(target_uri), ProcessResolverResultLocked, this));
1548   grpc_pollset_set_add_pollset_set(resolving_lb_policy_->interested_parties(),
1549                                    interested_parties_);
1550   if (GRPC_TRACE_FLAG_ENABLED(grpc_client_channel_routing_trace)) {
1551     gpr_log(GPR_INFO, "chand=%p: created resolving_lb_policy=%p", this,
1552             resolving_lb_policy_.get());
1553   }
1554 }
1555
1556 void ChannelData::DestroyResolvingLoadBalancingPolicyLocked() {
1557   if (resolving_lb_policy_ != nullptr) {
1558     grpc_pollset_set_del_pollset_set(resolving_lb_policy_->interested_parties(),
1559                                      interested_parties_);
1560     resolving_lb_policy_.reset();
1561   }
1562 }
1563
1564 void ChannelData::ProcessLbPolicy(
1565     const Resolver::Result& resolver_result,
1566     const internal::ClientChannelGlobalParsedConfig* parsed_service_config,
1567     UniquePtr<char>* lb_policy_name,
1568     RefCountedPtr<LoadBalancingPolicy::Config>* lb_policy_config) {
1569   // Prefer the LB policy name found in the service config.
1570   if (parsed_service_config != nullptr &&
1571       parsed_service_config->parsed_lb_config() != nullptr) {
1572     lb_policy_name->reset(
1573         gpr_strdup(parsed_service_config->parsed_lb_config()->name()));
1574     *lb_policy_config = parsed_service_config->parsed_lb_config();
1575     return;
1576   }
1577   const char* local_policy_name = nullptr;
1578   if (parsed_service_config != nullptr &&
1579       parsed_service_config->parsed_deprecated_lb_policy() != nullptr) {
1580     local_policy_name = parsed_service_config->parsed_deprecated_lb_policy();
1581   } else {
1582     const grpc_arg* channel_arg =
1583         grpc_channel_args_find(resolver_result.args, GRPC_ARG_LB_POLICY_NAME);
1584     local_policy_name = grpc_channel_arg_get_string(channel_arg);
1585   }
1586   // Special case: If at least one balancer address is present, we use
1587   // the grpclb policy, regardless of what the resolver has returned.
1588   bool found_balancer_address = false;
1589   for (size_t i = 0; i < resolver_result.addresses.size(); ++i) {
1590     const ServerAddress& address = resolver_result.addresses[i];
1591     if (address.IsBalancer()) {
1592       found_balancer_address = true;
1593       break;
1594     }
1595   }
1596   if (found_balancer_address) {
1597     if (local_policy_name != nullptr &&
1598         strcmp(local_policy_name, "grpclb") != 0) {
1599       gpr_log(GPR_INFO,
1600               "resolver requested LB policy %s but provided at least one "
1601               "balancer address -- forcing use of grpclb LB policy",
1602               local_policy_name);
1603     }
1604     local_policy_name = "grpclb";
1605   }
1606   // Use pick_first if nothing was specified and we didn't select grpclb
1607   // above.
1608   lb_policy_name->reset(gpr_strdup(
1609       local_policy_name == nullptr ? "pick_first" : local_policy_name));
1610 }
1611
1612 // Synchronous callback from ResolvingLoadBalancingPolicy to process a
1613 // resolver result update.
1614 bool ChannelData::ProcessResolverResultLocked(
1615     void* arg, const Resolver::Result& result, const char** lb_policy_name,
1616     RefCountedPtr<LoadBalancingPolicy::Config>* lb_policy_config,
1617     grpc_error** service_config_error) {
1618   ChannelData* chand = static_cast<ChannelData*>(arg);
1619   RefCountedPtr<ServiceConfig> service_config;
1620   // If resolver did not return a service config or returned an invalid service
1621   // config, we need a fallback service config.
1622   if (result.service_config_error != GRPC_ERROR_NONE) {
1623     // If the service config was invalid, then fallback to the saved service
1624     // config. If there is no saved config either, use the default service
1625     // config.
1626     if (chand->saved_service_config_ != nullptr) {
1627       service_config = chand->saved_service_config_;
1628       if (GRPC_TRACE_FLAG_ENABLED(grpc_client_channel_routing_trace)) {
1629         gpr_log(GPR_INFO,
1630                 "chand=%p: resolver returned invalid service config. "
1631                 "Continuing to use previous service config.",
1632                 chand);
1633       }
1634     } else if (chand->default_service_config_ != nullptr) {
1635       if (GRPC_TRACE_FLAG_ENABLED(grpc_client_channel_routing_trace)) {
1636         gpr_log(GPR_INFO,
1637                 "chand=%p: resolver returned invalid service config. Using "
1638                 "default service config provided by client API.",
1639                 chand);
1640       }
1641       service_config = chand->default_service_config_;
1642     }
1643   } else if (result.service_config == nullptr) {
1644     if (chand->default_service_config_ != nullptr) {
1645       if (GRPC_TRACE_FLAG_ENABLED(grpc_client_channel_routing_trace)) {
1646         gpr_log(GPR_INFO,
1647                 "chand=%p: resolver returned no service config. Using default "
1648                 "service config provided by client API.",
1649                 chand);
1650       }
1651       service_config = chand->default_service_config_;
1652     }
1653   } else {
1654     service_config = result.service_config;
1655   }
1656   *service_config_error = GRPC_ERROR_REF(result.service_config_error);
1657   if (service_config == nullptr &&
1658       result.service_config_error != GRPC_ERROR_NONE) {
1659     return false;
1660   }
1661   // Process service config.
1662   UniquePtr<char> service_config_json;
1663   const internal::ClientChannelGlobalParsedConfig* parsed_service_config =
1664       nullptr;
1665   if (service_config != nullptr) {
1666     parsed_service_config =
1667         static_cast<const internal::ClientChannelGlobalParsedConfig*>(
1668             service_config->GetGlobalParsedConfig(
1669                 internal::ClientChannelServiceConfigParser::ParserIndex()));
1670   }
1671   // Check if the config has changed.
1672   const bool service_config_changed =
1673       ((service_config == nullptr) !=
1674        (chand->saved_service_config_ == nullptr)) ||
1675       (service_config != nullptr &&
1676        strcmp(service_config->service_config_json(),
1677               chand->saved_service_config_->service_config_json()) != 0);
1678   if (service_config_changed) {
1679     service_config_json.reset(gpr_strdup(
1680         service_config != nullptr ? service_config->service_config_json()
1681                                   : ""));
1682     if (GRPC_TRACE_FLAG_ENABLED(grpc_client_channel_routing_trace)) {
1683       gpr_log(GPR_INFO,
1684               "chand=%p: resolver returned updated service config: \"%s\"",
1685               chand, service_config_json.get());
1686     }
1687     // Save health check service name.
1688     if (service_config != nullptr) {
1689       chand->health_check_service_name_.reset(
1690           gpr_strdup(parsed_service_config->health_check_service_name()));
1691     } else {
1692       chand->health_check_service_name_.reset();
1693     }
1694     // Update health check service name used by existing subchannel wrappers.
1695     for (const auto& p : chand->subchannel_wrappers_) {
1696       p.first->UpdateHealthCheckServiceName(
1697           UniquePtr<char>(gpr_strdup(chand->health_check_service_name_.get())));
1698     }
1699     // Save service config.
1700     chand->saved_service_config_ = std::move(service_config);
1701   }
1702   // We want to set the service config at least once. This should not really be
1703   // needed, but we are doing it as a defensive approach. This can be removed,
1704   // if we feel it is unnecessary.
1705   if (service_config_changed || !chand->received_first_resolver_result_) {
1706     chand->received_first_resolver_result_ = true;
1707     RefCountedPtr<ServerRetryThrottleData> retry_throttle_data;
1708     if (parsed_service_config != nullptr) {
1709       Optional<internal::ClientChannelGlobalParsedConfig::RetryThrottling>
1710           retry_throttle_config = parsed_service_config->retry_throttling();
1711       if (retry_throttle_config.has_value()) {
1712         retry_throttle_data =
1713             internal::ServerRetryThrottleMap::GetDataForServer(
1714                 chand->server_name_.get(),
1715                 retry_throttle_config.value().max_milli_tokens,
1716                 retry_throttle_config.value().milli_token_ratio);
1717       }
1718     }
1719     chand->UpdateServiceConfigLocked(std::move(retry_throttle_data),
1720                                      chand->saved_service_config_);
1721   }
1722   UniquePtr<char> processed_lb_policy_name;
1723   chand->ProcessLbPolicy(result, parsed_service_config,
1724                          &processed_lb_policy_name, lb_policy_config);
1725   // Swap out the data used by GetChannelInfo().
1726   {
1727     MutexLock lock(&chand->info_mu_);
1728     chand->info_lb_policy_name_ = std::move(processed_lb_policy_name);
1729     if (service_config_json != nullptr) {
1730       chand->info_service_config_json_ = std::move(service_config_json);
1731     }
1732   }
1733   // Return results.
1734   *lb_policy_name = chand->info_lb_policy_name_.get();
1735   return service_config_changed;
1736 }
1737
1738 grpc_error* ChannelData::DoPingLocked(grpc_transport_op* op) {
1739   if (grpc_connectivity_state_check(&state_tracker_) != GRPC_CHANNEL_READY) {
1740     return GRPC_ERROR_CREATE_FROM_STATIC_STRING("channel not connected");
1741   }
1742   LoadBalancingPolicy::PickResult result =
1743       picker_->Pick(LoadBalancingPolicy::PickArgs());
1744   ConnectedSubchannel* connected_subchannel = nullptr;
1745   if (result.subchannel != nullptr) {
1746     SubchannelWrapper* subchannel =
1747         static_cast<SubchannelWrapper*>(result.subchannel.get());
1748     connected_subchannel = subchannel->connected_subchannel();
1749   }
1750   if (connected_subchannel != nullptr) {
1751     connected_subchannel->Ping(op->send_ping.on_initiate, op->send_ping.on_ack);
1752   } else {
1753     if (result.error == GRPC_ERROR_NONE) {
1754       result.error = GRPC_ERROR_CREATE_FROM_STATIC_STRING(
1755           "LB policy dropped call on ping");
1756     }
1757   }
1758   return result.error;
1759 }
1760
1761 void ChannelData::StartTransportOpLocked(void* arg, grpc_error* ignored) {
1762   grpc_transport_op* op = static_cast<grpc_transport_op*>(arg);
1763   grpc_channel_element* elem =
1764       static_cast<grpc_channel_element*>(op->handler_private.extra_arg);
1765   ChannelData* chand = static_cast<ChannelData*>(elem->channel_data);
1766   // Connectivity watch.
1767   if (op->on_connectivity_state_change != nullptr) {
1768     grpc_connectivity_state_notify_on_state_change(
1769         &chand->state_tracker_, op->connectivity_state,
1770         op->on_connectivity_state_change);
1771     op->on_connectivity_state_change = nullptr;
1772     op->connectivity_state = nullptr;
1773   }
1774   // Ping.
1775   if (op->send_ping.on_initiate != nullptr || op->send_ping.on_ack != nullptr) {
1776     grpc_error* error = chand->DoPingLocked(op);
1777     if (error != GRPC_ERROR_NONE) {
1778       GRPC_CLOSURE_SCHED(op->send_ping.on_initiate, GRPC_ERROR_REF(error));
1779       GRPC_CLOSURE_SCHED(op->send_ping.on_ack, error);
1780     }
1781     op->bind_pollset = nullptr;
1782     op->send_ping.on_initiate = nullptr;
1783     op->send_ping.on_ack = nullptr;
1784   }
1785   // Reset backoff.
1786   if (op->reset_connect_backoff) {
1787     if (chand->resolving_lb_policy_ != nullptr) {
1788       chand->resolving_lb_policy_->ResetBackoffLocked();
1789     }
1790   }
1791   // Disconnect or enter IDLE.
1792   if (op->disconnect_with_error != GRPC_ERROR_NONE) {
1793     if (GRPC_TRACE_FLAG_ENABLED(grpc_client_channel_call_trace)) {
1794       gpr_log(GPR_INFO, "chand=%p: disconnect_with_error: %s", chand,
1795               grpc_error_string(op->disconnect_with_error));
1796     }
1797     chand->DestroyResolvingLoadBalancingPolicyLocked();
1798     intptr_t value;
1799     if (grpc_error_get_int(op->disconnect_with_error,
1800                            GRPC_ERROR_INT_CHANNEL_CONNECTIVITY_STATE, &value) &&
1801         static_cast<grpc_connectivity_state>(value) == GRPC_CHANNEL_IDLE) {
1802       if (chand->disconnect_error() == GRPC_ERROR_NONE) {
1803         // Enter IDLE state.
1804         chand->UpdateStateAndPickerLocked(GRPC_CHANNEL_IDLE,
1805                                           "channel entering IDLE", nullptr);
1806       }
1807       GRPC_ERROR_UNREF(op->disconnect_with_error);
1808     } else {
1809       // Disconnect.
1810       GPR_ASSERT(chand->disconnect_error_.Load(MemoryOrder::RELAXED) ==
1811                  GRPC_ERROR_NONE);
1812       chand->disconnect_error_.Store(op->disconnect_with_error,
1813                                      MemoryOrder::RELEASE);
1814       chand->UpdateStateAndPickerLocked(
1815           GRPC_CHANNEL_SHUTDOWN, "shutdown from API",
1816           UniquePtr<LoadBalancingPolicy::SubchannelPicker>(
1817               New<LoadBalancingPolicy::TransientFailurePicker>(
1818                   GRPC_ERROR_REF(op->disconnect_with_error))));
1819     }
1820   }
1821   GRPC_CHANNEL_STACK_UNREF(chand->owning_stack_, "start_transport_op");
1822   GRPC_CLOSURE_SCHED(op->on_consumed, GRPC_ERROR_NONE);
1823 }
1824
1825 void ChannelData::StartTransportOp(grpc_channel_element* elem,
1826                                    grpc_transport_op* op) {
1827   ChannelData* chand = static_cast<ChannelData*>(elem->channel_data);
1828   GPR_ASSERT(op->set_accept_stream == false);
1829   // Handle bind_pollset.
1830   if (op->bind_pollset != nullptr) {
1831     grpc_pollset_set_add_pollset(chand->interested_parties_, op->bind_pollset);
1832   }
1833   // Pop into control plane combiner for remaining ops.
1834   op->handler_private.extra_arg = elem;
1835   GRPC_CHANNEL_STACK_REF(chand->owning_stack_, "start_transport_op");
1836   GRPC_CLOSURE_SCHED(
1837       GRPC_CLOSURE_INIT(&op->handler_private.closure,
1838                         ChannelData::StartTransportOpLocked, op,
1839                         grpc_combiner_scheduler(chand->combiner_)),
1840       GRPC_ERROR_NONE);
1841 }
1842
1843 void ChannelData::GetChannelInfo(grpc_channel_element* elem,
1844                                  const grpc_channel_info* info) {
1845   ChannelData* chand = static_cast<ChannelData*>(elem->channel_data);
1846   MutexLock lock(&chand->info_mu_);
1847   if (info->lb_policy_name != nullptr) {
1848     *info->lb_policy_name = gpr_strdup(chand->info_lb_policy_name_.get());
1849   }
1850   if (info->service_config_json != nullptr) {
1851     *info->service_config_json =
1852         gpr_strdup(chand->info_service_config_json_.get());
1853   }
1854 }
1855
1856 void ChannelData::AddQueuedPick(QueuedPick* pick,
1857                                 grpc_polling_entity* pollent) {
1858   // Add call to queued picks list.
1859   pick->next = queued_picks_;
1860   queued_picks_ = pick;
1861   // Add call's pollent to channel's interested_parties, so that I/O
1862   // can be done under the call's CQ.
1863   grpc_polling_entity_add_to_pollset_set(pollent, interested_parties_);
1864 }
1865
1866 void ChannelData::RemoveQueuedPick(QueuedPick* to_remove,
1867                                    grpc_polling_entity* pollent) {
1868   // Remove call's pollent from channel's interested_parties.
1869   grpc_polling_entity_del_from_pollset_set(pollent, interested_parties_);
1870   // Remove from queued picks list.
1871   for (QueuedPick** pick = &queued_picks_; *pick != nullptr;
1872        pick = &(*pick)->next) {
1873     if (*pick == to_remove) {
1874       *pick = to_remove->next;
1875       return;
1876     }
1877   }
1878 }
1879
1880 RefCountedPtr<ConnectedSubchannel>
1881 ChannelData::GetConnectedSubchannelInDataPlane(
1882     SubchannelInterface* subchannel) const {
1883   SubchannelWrapper* subchannel_wrapper =
1884       static_cast<SubchannelWrapper*>(subchannel);
1885   ConnectedSubchannel* connected_subchannel =
1886       subchannel_wrapper->connected_subchannel_in_data_plane();
1887   if (connected_subchannel == nullptr) return nullptr;
1888   return connected_subchannel->Ref();
1889 }
1890
1891 void ChannelData::TryToConnectLocked(void* arg, grpc_error* error_ignored) {
1892   auto* chand = static_cast<ChannelData*>(arg);
1893   if (chand->resolving_lb_policy_ != nullptr) {
1894     chand->resolving_lb_policy_->ExitIdleLocked();
1895   } else {
1896     chand->CreateResolvingLoadBalancingPolicyLocked();
1897   }
1898   GRPC_CHANNEL_STACK_UNREF(chand->owning_stack_, "TryToConnect");
1899 }
1900
1901 grpc_connectivity_state ChannelData::CheckConnectivityState(
1902     bool try_to_connect) {
1903   grpc_connectivity_state out = grpc_connectivity_state_check(&state_tracker_);
1904   if (out == GRPC_CHANNEL_IDLE && try_to_connect) {
1905     GRPC_CHANNEL_STACK_REF(owning_stack_, "TryToConnect");
1906     GRPC_CLOSURE_SCHED(GRPC_CLOSURE_CREATE(TryToConnectLocked, this,
1907                                            grpc_combiner_scheduler(combiner_)),
1908                        GRPC_ERROR_NONE);
1909   }
1910   return out;
1911 }
1912
1913 //
1914 // CallData implementation
1915 //
1916
1917 // Retry support:
1918 //
1919 // In order to support retries, we act as a proxy for stream op batches.
1920 // When we get a batch from the surface, we add it to our list of pending
1921 // batches, and we then use those batches to construct separate "child"
1922 // batches to be started on the subchannel call.  When the child batches
1923 // return, we then decide which pending batches have been completed and
1924 // schedule their callbacks accordingly.  If a subchannel call fails and
1925 // we want to retry it, we do a new pick and start again, constructing
1926 // new "child" batches for the new subchannel call.
1927 //
1928 // Note that retries are committed when receiving data from the server
1929 // (except for Trailers-Only responses).  However, there may be many
1930 // send ops started before receiving any data, so we may have already
1931 // completed some number of send ops (and returned the completions up to
1932 // the surface) by the time we realize that we need to retry.  To deal
1933 // with this, we cache data for send ops, so that we can replay them on a
1934 // different subchannel call even after we have completed the original
1935 // batches.
1936 //
1937 // There are two sets of data to maintain:
1938 // - In call_data (in the parent channel), we maintain a list of pending
1939 //   ops and cached data for send ops.
1940 // - In the subchannel call, we maintain state to indicate what ops have
1941 //   already been sent down to that call.
1942 //
1943 // When constructing the "child" batches, we compare those two sets of
1944 // data to see which batches need to be sent to the subchannel call.
1945
1946 // TODO(roth): In subsequent PRs:
1947 // - add support for transparent retries (including initial metadata)
1948 // - figure out how to record stats in census for retries
1949 //   (census filter is on top of this one)
1950 // - add census stats for retries
1951
1952 CallData::CallData(grpc_call_element* elem, const ChannelData& chand,
1953                    const grpc_call_element_args& args)
1954     : deadline_state_(elem, args.call_stack, args.call_combiner,
1955                       GPR_LIKELY(chand.deadline_checking_enabled())
1956                           ? args.deadline
1957                           : GRPC_MILLIS_INF_FUTURE),
1958       path_(grpc_slice_ref_internal(args.path)),
1959       call_start_time_(args.start_time),
1960       deadline_(args.deadline),
1961       arena_(args.arena),
1962       owning_call_(args.call_stack),
1963       call_combiner_(args.call_combiner),
1964       call_context_(args.context),
1965       lb_call_state_(this),
1966       pending_send_initial_metadata_(false),
1967       pending_send_message_(false),
1968       pending_send_trailing_metadata_(false),
1969       enable_retries_(chand.enable_retries()),
1970       retry_committed_(false),
1971       last_attempt_got_server_pushback_(false) {}
1972
1973 CallData::~CallData() {
1974   grpc_slice_unref_internal(path_);
1975   GRPC_ERROR_UNREF(cancel_error_);
1976   if (backend_metric_data_ != nullptr) {
1977     backend_metric_data_
1978         ->LoadBalancingPolicy::BackendMetricData::~BackendMetricData();
1979   }
1980   // Make sure there are no remaining pending batches.
1981   for (size_t i = 0; i < GPR_ARRAY_SIZE(pending_batches_); ++i) {
1982     GPR_ASSERT(pending_batches_[i].batch == nullptr);
1983   }
1984 }
1985
1986 grpc_error* CallData::Init(grpc_call_element* elem,
1987                            const grpc_call_element_args* args) {
1988   ChannelData* chand = static_cast<ChannelData*>(elem->channel_data);
1989   new (elem->call_data) CallData(elem, *chand, *args);
1990   return GRPC_ERROR_NONE;
1991 }
1992
1993 void CallData::Destroy(grpc_call_element* elem,
1994                        const grpc_call_final_info* final_info,
1995                        grpc_closure* then_schedule_closure) {
1996   CallData* calld = static_cast<CallData*>(elem->call_data);
1997   if (GPR_LIKELY(calld->subchannel_call_ != nullptr)) {
1998     calld->subchannel_call_->SetAfterCallStackDestroy(then_schedule_closure);
1999     then_schedule_closure = nullptr;
2000   }
2001   calld->~CallData();
2002   GRPC_CLOSURE_SCHED(then_schedule_closure, GRPC_ERROR_NONE);
2003 }
2004
2005 void CallData::StartTransportStreamOpBatch(
2006     grpc_call_element* elem, grpc_transport_stream_op_batch* batch) {
2007   GPR_TIMER_SCOPE("cc_start_transport_stream_op_batch", 0);
2008   CallData* calld = static_cast<CallData*>(elem->call_data);
2009   ChannelData* chand = static_cast<ChannelData*>(elem->channel_data);
2010   if (GPR_LIKELY(chand->deadline_checking_enabled())) {
2011     grpc_deadline_state_client_start_transport_stream_op_batch(elem, batch);
2012   }
2013   // If we've previously been cancelled, immediately fail any new batches.
2014   if (GPR_UNLIKELY(calld->cancel_error_ != GRPC_ERROR_NONE)) {
2015     if (GRPC_TRACE_FLAG_ENABLED(grpc_client_channel_call_trace)) {
2016       gpr_log(GPR_INFO, "chand=%p calld=%p: failing batch with error: %s",
2017               chand, calld, grpc_error_string(calld->cancel_error_));
2018     }
2019     // Note: This will release the call combiner.
2020     grpc_transport_stream_op_batch_finish_with_failure(
2021         batch, GRPC_ERROR_REF(calld->cancel_error_), calld->call_combiner_);
2022     return;
2023   }
2024   // Handle cancellation.
2025   if (GPR_UNLIKELY(batch->cancel_stream)) {
2026     // Stash a copy of cancel_error in our call data, so that we can use
2027     // it for subsequent operations.  This ensures that if the call is
2028     // cancelled before any batches are passed down (e.g., if the deadline
2029     // is in the past when the call starts), we can return the right
2030     // error to the caller when the first batch does get passed down.
2031     GRPC_ERROR_UNREF(calld->cancel_error_);
2032     calld->cancel_error_ =
2033         GRPC_ERROR_REF(batch->payload->cancel_stream.cancel_error);
2034     if (GRPC_TRACE_FLAG_ENABLED(grpc_client_channel_call_trace)) {
2035       gpr_log(GPR_INFO, "chand=%p calld=%p: recording cancel_error=%s", chand,
2036               calld, grpc_error_string(calld->cancel_error_));
2037     }
2038     // If we do not have a subchannel call (i.e., a pick has not yet
2039     // been started), fail all pending batches.  Otherwise, send the
2040     // cancellation down to the subchannel call.
2041     if (calld->subchannel_call_ == nullptr) {
2042       // TODO(roth): If there is a pending retry callback, do we need to
2043       // cancel it here?
2044       calld->PendingBatchesFail(elem, GRPC_ERROR_REF(calld->cancel_error_),
2045                                 NoYieldCallCombiner);
2046       // Note: This will release the call combiner.
2047       grpc_transport_stream_op_batch_finish_with_failure(
2048           batch, GRPC_ERROR_REF(calld->cancel_error_), calld->call_combiner_);
2049     } else {
2050       // Note: This will release the call combiner.
2051       calld->subchannel_call_->StartTransportStreamOpBatch(batch);
2052     }
2053     return;
2054   }
2055   // Add the batch to the pending list.
2056   calld->PendingBatchesAdd(elem, batch);
2057   // Check if we've already gotten a subchannel call.
2058   // Note that once we have picked a subchannel, we do not need to acquire
2059   // the channel's data plane mutex, which is more efficient (especially for
2060   // streaming calls).
2061   if (calld->subchannel_call_ != nullptr) {
2062     if (GRPC_TRACE_FLAG_ENABLED(grpc_client_channel_call_trace)) {
2063       gpr_log(GPR_INFO,
2064               "chand=%p calld=%p: starting batch on subchannel_call=%p", chand,
2065               calld, calld->subchannel_call_.get());
2066     }
2067     calld->PendingBatchesResume(elem);
2068     return;
2069   }
2070   // We do not yet have a subchannel call.
2071   // For batches containing a send_initial_metadata op, acquire the
2072   // channel's data plane mutex to pick a subchannel.
2073   if (GPR_LIKELY(batch->send_initial_metadata)) {
2074     if (GRPC_TRACE_FLAG_ENABLED(grpc_client_channel_call_trace)) {
2075       gpr_log(GPR_INFO,
2076               "chand=%p calld=%p: grabbing data plane mutex to perform pick",
2077               chand, calld);
2078     }
2079     PickSubchannel(elem, GRPC_ERROR_NONE);
2080   } else {
2081     // For all other batches, release the call combiner.
2082     if (GRPC_TRACE_FLAG_ENABLED(grpc_client_channel_call_trace)) {
2083       gpr_log(GPR_INFO,
2084               "chand=%p calld=%p: saved batch, yielding call combiner", chand,
2085               calld);
2086     }
2087     GRPC_CALL_COMBINER_STOP(calld->call_combiner_,
2088                             "batch does not include send_initial_metadata");
2089   }
2090 }
2091
2092 void CallData::SetPollent(grpc_call_element* elem,
2093                           grpc_polling_entity* pollent) {
2094   CallData* calld = static_cast<CallData*>(elem->call_data);
2095   calld->pollent_ = pollent;
2096 }
2097
2098 //
2099 // send op data caching
2100 //
2101
2102 void CallData::MaybeCacheSendOpsForBatch(PendingBatch* pending) {
2103   if (pending->send_ops_cached) return;
2104   pending->send_ops_cached = true;
2105   grpc_transport_stream_op_batch* batch = pending->batch;
2106   // Save a copy of metadata for send_initial_metadata ops.
2107   if (batch->send_initial_metadata) {
2108     seen_send_initial_metadata_ = true;
2109     GPR_ASSERT(send_initial_metadata_storage_ == nullptr);
2110     grpc_metadata_batch* send_initial_metadata =
2111         batch->payload->send_initial_metadata.send_initial_metadata;
2112     send_initial_metadata_storage_ = (grpc_linked_mdelem*)arena_->Alloc(
2113         sizeof(grpc_linked_mdelem) * send_initial_metadata->list.count);
2114     grpc_metadata_batch_copy(send_initial_metadata, &send_initial_metadata_,
2115                              send_initial_metadata_storage_);
2116     send_initial_metadata_flags_ =
2117         batch->payload->send_initial_metadata.send_initial_metadata_flags;
2118     peer_string_ = batch->payload->send_initial_metadata.peer_string;
2119   }
2120   // Set up cache for send_message ops.
2121   if (batch->send_message) {
2122     ByteStreamCache* cache = arena_->New<ByteStreamCache>(
2123         std::move(batch->payload->send_message.send_message));
2124     send_messages_.push_back(cache);
2125   }
2126   // Save metadata batch for send_trailing_metadata ops.
2127   if (batch->send_trailing_metadata) {
2128     seen_send_trailing_metadata_ = true;
2129     GPR_ASSERT(send_trailing_metadata_storage_ == nullptr);
2130     grpc_metadata_batch* send_trailing_metadata =
2131         batch->payload->send_trailing_metadata.send_trailing_metadata;
2132     send_trailing_metadata_storage_ = (grpc_linked_mdelem*)arena_->Alloc(
2133         sizeof(grpc_linked_mdelem) * send_trailing_metadata->list.count);
2134     grpc_metadata_batch_copy(send_trailing_metadata, &send_trailing_metadata_,
2135                              send_trailing_metadata_storage_);
2136   }
2137 }
2138
2139 void CallData::FreeCachedSendInitialMetadata(ChannelData* chand) {
2140   if (GRPC_TRACE_FLAG_ENABLED(grpc_client_channel_call_trace)) {
2141     gpr_log(GPR_INFO,
2142             "chand=%p calld=%p: destroying calld->send_initial_metadata", chand,
2143             this);
2144   }
2145   grpc_metadata_batch_destroy(&send_initial_metadata_);
2146 }
2147
2148 void CallData::FreeCachedSendMessage(ChannelData* chand, size_t idx) {
2149   if (GRPC_TRACE_FLAG_ENABLED(grpc_client_channel_call_trace)) {
2150     gpr_log(GPR_INFO,
2151             "chand=%p calld=%p: destroying calld->send_messages[%" PRIuPTR "]",
2152             chand, this, idx);
2153   }
2154   send_messages_[idx]->Destroy();
2155 }
2156
2157 void CallData::FreeCachedSendTrailingMetadata(ChannelData* chand) {
2158   if (GRPC_TRACE_FLAG_ENABLED(grpc_client_channel_call_trace)) {
2159     gpr_log(GPR_INFO,
2160             "chand=%p calld=%p: destroying calld->send_trailing_metadata",
2161             chand, this);
2162   }
2163   grpc_metadata_batch_destroy(&send_trailing_metadata_);
2164 }
2165
2166 void CallData::FreeCachedSendOpDataAfterCommit(
2167     grpc_call_element* elem, SubchannelCallRetryState* retry_state) {
2168   ChannelData* chand = static_cast<ChannelData*>(elem->channel_data);
2169   if (retry_state->completed_send_initial_metadata) {
2170     FreeCachedSendInitialMetadata(chand);
2171   }
2172   for (size_t i = 0; i < retry_state->completed_send_message_count; ++i) {
2173     FreeCachedSendMessage(chand, i);
2174   }
2175   if (retry_state->completed_send_trailing_metadata) {
2176     FreeCachedSendTrailingMetadata(chand);
2177   }
2178 }
2179
2180 void CallData::FreeCachedSendOpDataForCompletedBatch(
2181     grpc_call_element* elem, SubchannelCallBatchData* batch_data,
2182     SubchannelCallRetryState* retry_state) {
2183   ChannelData* chand = static_cast<ChannelData*>(elem->channel_data);
2184   if (batch_data->batch.send_initial_metadata) {
2185     FreeCachedSendInitialMetadata(chand);
2186   }
2187   if (batch_data->batch.send_message) {
2188     FreeCachedSendMessage(chand, retry_state->completed_send_message_count - 1);
2189   }
2190   if (batch_data->batch.send_trailing_metadata) {
2191     FreeCachedSendTrailingMetadata(chand);
2192   }
2193 }
2194
2195 //
2196 // LB recv_trailing_metadata_ready handling
2197 //
2198
2199 void CallData::RecvTrailingMetadataReadyForLoadBalancingPolicy(
2200     void* arg, grpc_error* error) {
2201   CallData* calld = static_cast<CallData*>(arg);
2202   // Invoke callback to LB policy.
2203   Metadata trailing_metadata(calld, calld->recv_trailing_metadata_);
2204   calld->lb_recv_trailing_metadata_ready_(
2205       calld->lb_recv_trailing_metadata_ready_user_data_, error,
2206       &trailing_metadata, &calld->lb_call_state_);
2207   // Chain to original callback.
2208   GRPC_CLOSURE_RUN(calld->original_recv_trailing_metadata_ready_,
2209                    GRPC_ERROR_REF(error));
2210 }
2211
2212 void CallData::MaybeInjectRecvTrailingMetadataReadyForLoadBalancingPolicy(
2213     grpc_transport_stream_op_batch* batch) {
2214   if (lb_recv_trailing_metadata_ready_ != nullptr) {
2215     recv_trailing_metadata_ =
2216         batch->payload->recv_trailing_metadata.recv_trailing_metadata;
2217     original_recv_trailing_metadata_ready_ =
2218         batch->payload->recv_trailing_metadata.recv_trailing_metadata_ready;
2219     GRPC_CLOSURE_INIT(&recv_trailing_metadata_ready_,
2220                       RecvTrailingMetadataReadyForLoadBalancingPolicy, this,
2221                       grpc_schedule_on_exec_ctx);
2222     batch->payload->recv_trailing_metadata.recv_trailing_metadata_ready =
2223         &recv_trailing_metadata_ready_;
2224   }
2225 }
2226
2227 //
2228 // pending_batches management
2229 //
2230
2231 size_t CallData::GetBatchIndex(grpc_transport_stream_op_batch* batch) {
2232   // Note: It is important the send_initial_metadata be the first entry
2233   // here, since the code in pick_subchannel_locked() assumes it will be.
2234   if (batch->send_initial_metadata) return 0;
2235   if (batch->send_message) return 1;
2236   if (batch->send_trailing_metadata) return 2;
2237   if (batch->recv_initial_metadata) return 3;
2238   if (batch->recv_message) return 4;
2239   if (batch->recv_trailing_metadata) return 5;
2240   GPR_UNREACHABLE_CODE(return (size_t)-1);
2241 }
2242
2243 // This is called via the call combiner, so access to calld is synchronized.
2244 void CallData::PendingBatchesAdd(grpc_call_element* elem,
2245                                  grpc_transport_stream_op_batch* batch) {
2246   ChannelData* chand = static_cast<ChannelData*>(elem->channel_data);
2247   const size_t idx = GetBatchIndex(batch);
2248   if (GRPC_TRACE_FLAG_ENABLED(grpc_client_channel_call_trace)) {
2249     gpr_log(GPR_INFO,
2250             "chand=%p calld=%p: adding pending batch at index %" PRIuPTR, chand,
2251             this, idx);
2252   }
2253   PendingBatch* pending = &pending_batches_[idx];
2254   GPR_ASSERT(pending->batch == nullptr);
2255   pending->batch = batch;
2256   pending->send_ops_cached = false;
2257   if (enable_retries_) {
2258     // Update state in calld about pending batches.
2259     // Also check if the batch takes us over the retry buffer limit.
2260     // Note: We don't check the size of trailing metadata here, because
2261     // gRPC clients do not send trailing metadata.
2262     if (batch->send_initial_metadata) {
2263       pending_send_initial_metadata_ = true;
2264       bytes_buffered_for_retry_ += grpc_metadata_batch_size(
2265           batch->payload->send_initial_metadata.send_initial_metadata);
2266     }
2267     if (batch->send_message) {
2268       pending_send_message_ = true;
2269       bytes_buffered_for_retry_ +=
2270           batch->payload->send_message.send_message->length();
2271     }
2272     if (batch->send_trailing_metadata) {
2273       pending_send_trailing_metadata_ = true;
2274     }
2275     if (GPR_UNLIKELY(bytes_buffered_for_retry_ >
2276                      chand->per_rpc_retry_buffer_size())) {
2277       if (GRPC_TRACE_FLAG_ENABLED(grpc_client_channel_call_trace)) {
2278         gpr_log(GPR_INFO,
2279                 "chand=%p calld=%p: exceeded retry buffer size, committing",
2280                 chand, this);
2281       }
2282       SubchannelCallRetryState* retry_state =
2283           subchannel_call_ == nullptr ? nullptr
2284                                       : static_cast<SubchannelCallRetryState*>(
2285                                             subchannel_call_->GetParentData());
2286       RetryCommit(elem, retry_state);
2287       // If we are not going to retry and have not yet started, pretend
2288       // retries are disabled so that we don't bother with retry overhead.
2289       if (num_attempts_completed_ == 0) {
2290         if (GRPC_TRACE_FLAG_ENABLED(grpc_client_channel_call_trace)) {
2291           gpr_log(GPR_INFO,
2292                   "chand=%p calld=%p: disabling retries before first attempt",
2293                   chand, this);
2294         }
2295         enable_retries_ = false;
2296       }
2297     }
2298   }
2299 }
2300
2301 void CallData::PendingBatchClear(PendingBatch* pending) {
2302   if (enable_retries_) {
2303     if (pending->batch->send_initial_metadata) {
2304       pending_send_initial_metadata_ = false;
2305     }
2306     if (pending->batch->send_message) {
2307       pending_send_message_ = false;
2308     }
2309     if (pending->batch->send_trailing_metadata) {
2310       pending_send_trailing_metadata_ = false;
2311     }
2312   }
2313   pending->batch = nullptr;
2314 }
2315
2316 void CallData::MaybeClearPendingBatch(grpc_call_element* elem,
2317                                       PendingBatch* pending) {
2318   ChannelData* chand = static_cast<ChannelData*>(elem->channel_data);
2319   grpc_transport_stream_op_batch* batch = pending->batch;
2320   // We clear the pending batch if all of its callbacks have been
2321   // scheduled and reset to nullptr.
2322   if (batch->on_complete == nullptr &&
2323       (!batch->recv_initial_metadata ||
2324        batch->payload->recv_initial_metadata.recv_initial_metadata_ready ==
2325            nullptr) &&
2326       (!batch->recv_message ||
2327        batch->payload->recv_message.recv_message_ready == nullptr) &&
2328       (!batch->recv_trailing_metadata ||
2329        batch->payload->recv_trailing_metadata.recv_trailing_metadata_ready ==
2330            nullptr)) {
2331     if (GRPC_TRACE_FLAG_ENABLED(grpc_client_channel_call_trace)) {
2332       gpr_log(GPR_INFO, "chand=%p calld=%p: clearing pending batch", chand,
2333               this);
2334     }
2335     PendingBatchClear(pending);
2336   }
2337 }
2338
2339 // This is called via the call combiner, so access to calld is synchronized.
2340 void CallData::FailPendingBatchInCallCombiner(void* arg, grpc_error* error) {
2341   grpc_transport_stream_op_batch* batch =
2342       static_cast<grpc_transport_stream_op_batch*>(arg);
2343   CallData* calld = static_cast<CallData*>(batch->handler_private.extra_arg);
2344   // Note: This will release the call combiner.
2345   grpc_transport_stream_op_batch_finish_with_failure(
2346       batch, GRPC_ERROR_REF(error), calld->call_combiner_);
2347 }
2348
2349 // This is called via the call combiner, so access to calld is synchronized.
2350 void CallData::PendingBatchesFail(
2351     grpc_call_element* elem, grpc_error* error,
2352     YieldCallCombinerPredicate yield_call_combiner_predicate) {
2353   GPR_ASSERT(error != GRPC_ERROR_NONE);
2354   if (GRPC_TRACE_FLAG_ENABLED(grpc_client_channel_call_trace)) {
2355     size_t num_batches = 0;
2356     for (size_t i = 0; i < GPR_ARRAY_SIZE(pending_batches_); ++i) {
2357       if (pending_batches_[i].batch != nullptr) ++num_batches;
2358     }
2359     gpr_log(GPR_INFO,
2360             "chand=%p calld=%p: failing %" PRIuPTR " pending batches: %s",
2361             elem->channel_data, this, num_batches, grpc_error_string(error));
2362   }
2363   CallCombinerClosureList closures;
2364   for (size_t i = 0; i < GPR_ARRAY_SIZE(pending_batches_); ++i) {
2365     PendingBatch* pending = &pending_batches_[i];
2366     grpc_transport_stream_op_batch* batch = pending->batch;
2367     if (batch != nullptr) {
2368       if (batch->recv_trailing_metadata) {
2369         MaybeInjectRecvTrailingMetadataReadyForLoadBalancingPolicy(batch);
2370       }
2371       batch->handler_private.extra_arg = this;
2372       GRPC_CLOSURE_INIT(&batch->handler_private.closure,
2373                         FailPendingBatchInCallCombiner, batch,
2374                         grpc_schedule_on_exec_ctx);
2375       closures.Add(&batch->handler_private.closure, GRPC_ERROR_REF(error),
2376                    "PendingBatchesFail");
2377       PendingBatchClear(pending);
2378     }
2379   }
2380   if (yield_call_combiner_predicate(closures)) {
2381     closures.RunClosures(call_combiner_);
2382   } else {
2383     closures.RunClosuresWithoutYielding(call_combiner_);
2384   }
2385   GRPC_ERROR_UNREF(error);
2386 }
2387
2388 // This is called via the call combiner, so access to calld is synchronized.
2389 void CallData::ResumePendingBatchInCallCombiner(void* arg,
2390                                                 grpc_error* ignored) {
2391   grpc_transport_stream_op_batch* batch =
2392       static_cast<grpc_transport_stream_op_batch*>(arg);
2393   SubchannelCall* subchannel_call =
2394       static_cast<SubchannelCall*>(batch->handler_private.extra_arg);
2395   // Note: This will release the call combiner.
2396   subchannel_call->StartTransportStreamOpBatch(batch);
2397 }
2398
2399 // This is called via the call combiner, so access to calld is synchronized.
2400 void CallData::PendingBatchesResume(grpc_call_element* elem) {
2401   ChannelData* chand = static_cast<ChannelData*>(elem->channel_data);
2402   if (enable_retries_) {
2403     StartRetriableSubchannelBatches(elem, GRPC_ERROR_NONE);
2404     return;
2405   }
2406   // Retries not enabled; send down batches as-is.
2407   if (GRPC_TRACE_FLAG_ENABLED(grpc_client_channel_call_trace)) {
2408     size_t num_batches = 0;
2409     for (size_t i = 0; i < GPR_ARRAY_SIZE(pending_batches_); ++i) {
2410       if (pending_batches_[i].batch != nullptr) ++num_batches;
2411     }
2412     gpr_log(GPR_INFO,
2413             "chand=%p calld=%p: starting %" PRIuPTR
2414             " pending batches on subchannel_call=%p",
2415             chand, this, num_batches, subchannel_call_.get());
2416   }
2417   CallCombinerClosureList closures;
2418   for (size_t i = 0; i < GPR_ARRAY_SIZE(pending_batches_); ++i) {
2419     PendingBatch* pending = &pending_batches_[i];
2420     grpc_transport_stream_op_batch* batch = pending->batch;
2421     if (batch != nullptr) {
2422       if (batch->recv_trailing_metadata) {
2423         MaybeInjectRecvTrailingMetadataReadyForLoadBalancingPolicy(batch);
2424       }
2425       batch->handler_private.extra_arg = subchannel_call_.get();
2426       GRPC_CLOSURE_INIT(&batch->handler_private.closure,
2427                         ResumePendingBatchInCallCombiner, batch,
2428                         grpc_schedule_on_exec_ctx);
2429       closures.Add(&batch->handler_private.closure, GRPC_ERROR_NONE,
2430                    "PendingBatchesResume");
2431       PendingBatchClear(pending);
2432     }
2433   }
2434   // Note: This will release the call combiner.
2435   closures.RunClosures(call_combiner_);
2436 }
2437
2438 template <typename Predicate>
2439 CallData::PendingBatch* CallData::PendingBatchFind(grpc_call_element* elem,
2440                                                    const char* log_message,
2441                                                    Predicate predicate) {
2442   ChannelData* chand = static_cast<ChannelData*>(elem->channel_data);
2443   for (size_t i = 0; i < GPR_ARRAY_SIZE(pending_batches_); ++i) {
2444     PendingBatch* pending = &pending_batches_[i];
2445     grpc_transport_stream_op_batch* batch = pending->batch;
2446     if (batch != nullptr && predicate(batch)) {
2447       if (GRPC_TRACE_FLAG_ENABLED(grpc_client_channel_call_trace)) {
2448         gpr_log(GPR_INFO,
2449                 "chand=%p calld=%p: %s pending batch at index %" PRIuPTR, chand,
2450                 this, log_message, i);
2451       }
2452       return pending;
2453     }
2454   }
2455   return nullptr;
2456 }
2457
2458 //
2459 // retry code
2460 //
2461
2462 void CallData::RetryCommit(grpc_call_element* elem,
2463                            SubchannelCallRetryState* retry_state) {
2464   ChannelData* chand = static_cast<ChannelData*>(elem->channel_data);
2465   if (retry_committed_) return;
2466   retry_committed_ = true;
2467   if (GRPC_TRACE_FLAG_ENABLED(grpc_client_channel_call_trace)) {
2468     gpr_log(GPR_INFO, "chand=%p calld=%p: committing retries", chand, this);
2469   }
2470   if (retry_state != nullptr) {
2471     FreeCachedSendOpDataAfterCommit(elem, retry_state);
2472   }
2473 }
2474
2475 void CallData::DoRetry(grpc_call_element* elem,
2476                        SubchannelCallRetryState* retry_state,
2477                        grpc_millis server_pushback_ms) {
2478   ChannelData* chand = static_cast<ChannelData*>(elem->channel_data);
2479   GPR_ASSERT(method_params_ != nullptr);
2480   const auto* retry_policy = method_params_->retry_policy();
2481   GPR_ASSERT(retry_policy != nullptr);
2482   // Reset subchannel call.
2483   subchannel_call_.reset();
2484   // Compute backoff delay.
2485   grpc_millis next_attempt_time;
2486   if (server_pushback_ms >= 0) {
2487     next_attempt_time = ExecCtx::Get()->Now() + server_pushback_ms;
2488     last_attempt_got_server_pushback_ = true;
2489   } else {
2490     if (num_attempts_completed_ == 1 || last_attempt_got_server_pushback_) {
2491       retry_backoff_.Init(
2492           BackOff::Options()
2493               .set_initial_backoff(retry_policy->initial_backoff)
2494               .set_multiplier(retry_policy->backoff_multiplier)
2495               .set_jitter(RETRY_BACKOFF_JITTER)
2496               .set_max_backoff(retry_policy->max_backoff));
2497       last_attempt_got_server_pushback_ = false;
2498     }
2499     next_attempt_time = retry_backoff_->NextAttemptTime();
2500   }
2501   if (GRPC_TRACE_FLAG_ENABLED(grpc_client_channel_call_trace)) {
2502     gpr_log(GPR_INFO,
2503             "chand=%p calld=%p: retrying failed call in %" PRId64 " ms", chand,
2504             this, next_attempt_time - ExecCtx::Get()->Now());
2505   }
2506   // Schedule retry after computed delay.
2507   GRPC_CLOSURE_INIT(&pick_closure_, PickSubchannel, elem,
2508                     grpc_schedule_on_exec_ctx);
2509   grpc_timer_init(&retry_timer_, next_attempt_time, &pick_closure_);
2510   // Update bookkeeping.
2511   if (retry_state != nullptr) retry_state->retry_dispatched = true;
2512 }
2513
2514 bool CallData::MaybeRetry(grpc_call_element* elem,
2515                           SubchannelCallBatchData* batch_data,
2516                           grpc_status_code status,
2517                           grpc_mdelem* server_pushback_md) {
2518   ChannelData* chand = static_cast<ChannelData*>(elem->channel_data);
2519   // Get retry policy.
2520   if (method_params_ == nullptr) return false;
2521   const auto* retry_policy = method_params_->retry_policy();
2522   if (retry_policy == nullptr) return false;
2523   // If we've already dispatched a retry from this call, return true.
2524   // This catches the case where the batch has multiple callbacks
2525   // (i.e., it includes either recv_message or recv_initial_metadata).
2526   SubchannelCallRetryState* retry_state = nullptr;
2527   if (batch_data != nullptr) {
2528     retry_state = static_cast<SubchannelCallRetryState*>(
2529         batch_data->subchannel_call->GetParentData());
2530     if (retry_state->retry_dispatched) {
2531       if (GRPC_TRACE_FLAG_ENABLED(grpc_client_channel_call_trace)) {
2532         gpr_log(GPR_INFO, "chand=%p calld=%p: retry already dispatched", chand,
2533                 this);
2534       }
2535       return true;
2536     }
2537   }
2538   // Check status.
2539   if (GPR_LIKELY(status == GRPC_STATUS_OK)) {
2540     if (retry_throttle_data_ != nullptr) {
2541       retry_throttle_data_->RecordSuccess();
2542     }
2543     if (GRPC_TRACE_FLAG_ENABLED(grpc_client_channel_call_trace)) {
2544       gpr_log(GPR_INFO, "chand=%p calld=%p: call succeeded", chand, this);
2545     }
2546     return false;
2547   }
2548   // Status is not OK.  Check whether the status is retryable.
2549   if (!retry_policy->retryable_status_codes.Contains(status)) {
2550     if (GRPC_TRACE_FLAG_ENABLED(grpc_client_channel_call_trace)) {
2551       gpr_log(GPR_INFO,
2552               "chand=%p calld=%p: status %s not configured as retryable", chand,
2553               this, grpc_status_code_to_string(status));
2554     }
2555     return false;
2556   }
2557   // Record the failure and check whether retries are throttled.
2558   // Note that it's important for this check to come after the status
2559   // code check above, since we should only record failures whose statuses
2560   // match the configured retryable status codes, so that we don't count
2561   // things like failures due to malformed requests (INVALID_ARGUMENT).
2562   // Conversely, it's important for this to come before the remaining
2563   // checks, so that we don't fail to record failures due to other factors.
2564   if (retry_throttle_data_ != nullptr &&
2565       !retry_throttle_data_->RecordFailure()) {
2566     if (GRPC_TRACE_FLAG_ENABLED(grpc_client_channel_call_trace)) {
2567       gpr_log(GPR_INFO, "chand=%p calld=%p: retries throttled", chand, this);
2568     }
2569     return false;
2570   }
2571   // Check whether the call is committed.
2572   if (retry_committed_) {
2573     if (GRPC_TRACE_FLAG_ENABLED(grpc_client_channel_call_trace)) {
2574       gpr_log(GPR_INFO, "chand=%p calld=%p: retries already committed", chand,
2575               this);
2576     }
2577     return false;
2578   }
2579   // Check whether we have retries remaining.
2580   ++num_attempts_completed_;
2581   if (num_attempts_completed_ >= retry_policy->max_attempts) {
2582     if (GRPC_TRACE_FLAG_ENABLED(grpc_client_channel_call_trace)) {
2583       gpr_log(GPR_INFO, "chand=%p calld=%p: exceeded %d retry attempts", chand,
2584               this, retry_policy->max_attempts);
2585     }
2586     return false;
2587   }
2588   // If the call was cancelled from the surface, don't retry.
2589   if (cancel_error_ != GRPC_ERROR_NONE) {
2590     if (GRPC_TRACE_FLAG_ENABLED(grpc_client_channel_call_trace)) {
2591       gpr_log(GPR_INFO,
2592               "chand=%p calld=%p: call cancelled from surface, not retrying",
2593               chand, this);
2594     }
2595     return false;
2596   }
2597   // Check server push-back.
2598   grpc_millis server_pushback_ms = -1;
2599   if (server_pushback_md != nullptr) {
2600     // If the value is "-1" or any other unparseable string, we do not retry.
2601     uint32_t ms;
2602     if (!grpc_parse_slice_to_uint32(GRPC_MDVALUE(*server_pushback_md), &ms)) {
2603       if (GRPC_TRACE_FLAG_ENABLED(grpc_client_channel_call_trace)) {
2604         gpr_log(GPR_INFO,
2605                 "chand=%p calld=%p: not retrying due to server push-back",
2606                 chand, this);
2607       }
2608       return false;
2609     } else {
2610       if (GRPC_TRACE_FLAG_ENABLED(grpc_client_channel_call_trace)) {
2611         gpr_log(GPR_INFO, "chand=%p calld=%p: server push-back: retry in %u ms",
2612                 chand, this, ms);
2613       }
2614       server_pushback_ms = (grpc_millis)ms;
2615     }
2616   }
2617   DoRetry(elem, retry_state, server_pushback_ms);
2618   return true;
2619 }
2620
2621 //
2622 // CallData::SubchannelCallBatchData
2623 //
2624
2625 CallData::SubchannelCallBatchData* CallData::SubchannelCallBatchData::Create(
2626     grpc_call_element* elem, int refcount, bool set_on_complete) {
2627   CallData* calld = static_cast<CallData*>(elem->call_data);
2628   return calld->arena_->New<SubchannelCallBatchData>(elem, calld, refcount,
2629                                                      set_on_complete);
2630 }
2631
2632 CallData::SubchannelCallBatchData::SubchannelCallBatchData(
2633     grpc_call_element* elem, CallData* calld, int refcount,
2634     bool set_on_complete)
2635     : elem(elem), subchannel_call(calld->subchannel_call_) {
2636   SubchannelCallRetryState* retry_state =
2637       static_cast<SubchannelCallRetryState*>(
2638           calld->subchannel_call_->GetParentData());
2639   batch.payload = &retry_state->batch_payload;
2640   gpr_ref_init(&refs, refcount);
2641   if (set_on_complete) {
2642     GRPC_CLOSURE_INIT(&on_complete, CallData::OnComplete, this,
2643                       grpc_schedule_on_exec_ctx);
2644     batch.on_complete = &on_complete;
2645   }
2646   GRPC_CALL_STACK_REF(calld->owning_call_, "batch_data");
2647 }
2648
2649 void CallData::SubchannelCallBatchData::Destroy() {
2650   SubchannelCallRetryState* retry_state =
2651       static_cast<SubchannelCallRetryState*>(subchannel_call->GetParentData());
2652   if (batch.send_initial_metadata) {
2653     grpc_metadata_batch_destroy(&retry_state->send_initial_metadata);
2654   }
2655   if (batch.send_trailing_metadata) {
2656     grpc_metadata_batch_destroy(&retry_state->send_trailing_metadata);
2657   }
2658   if (batch.recv_initial_metadata) {
2659     grpc_metadata_batch_destroy(&retry_state->recv_initial_metadata);
2660   }
2661   if (batch.recv_trailing_metadata) {
2662     grpc_metadata_batch_destroy(&retry_state->recv_trailing_metadata);
2663   }
2664   subchannel_call.reset();
2665   CallData* calld = static_cast<CallData*>(elem->call_data);
2666   GRPC_CALL_STACK_UNREF(calld->owning_call_, "batch_data");
2667 }
2668
2669 //
2670 // recv_initial_metadata callback handling
2671 //
2672
2673 void CallData::InvokeRecvInitialMetadataCallback(void* arg, grpc_error* error) {
2674   SubchannelCallBatchData* batch_data =
2675       static_cast<SubchannelCallBatchData*>(arg);
2676   CallData* calld = static_cast<CallData*>(batch_data->elem->call_data);
2677   // Find pending batch.
2678   PendingBatch* pending = calld->PendingBatchFind(
2679       batch_data->elem, "invoking recv_initial_metadata_ready for",
2680       [](grpc_transport_stream_op_batch* batch) {
2681         return batch->recv_initial_metadata &&
2682                batch->payload->recv_initial_metadata
2683                        .recv_initial_metadata_ready != nullptr;
2684       });
2685   GPR_ASSERT(pending != nullptr);
2686   // Return metadata.
2687   SubchannelCallRetryState* retry_state =
2688       static_cast<SubchannelCallRetryState*>(
2689           batch_data->subchannel_call->GetParentData());
2690   grpc_metadata_batch_move(
2691       &retry_state->recv_initial_metadata,
2692       pending->batch->payload->recv_initial_metadata.recv_initial_metadata);
2693   // Update bookkeeping.
2694   // Note: Need to do this before invoking the callback, since invoking
2695   // the callback will result in yielding the call combiner.
2696   grpc_closure* recv_initial_metadata_ready =
2697       pending->batch->payload->recv_initial_metadata
2698           .recv_initial_metadata_ready;
2699   pending->batch->payload->recv_initial_metadata.recv_initial_metadata_ready =
2700       nullptr;
2701   calld->MaybeClearPendingBatch(batch_data->elem, pending);
2702   batch_data->Unref();
2703   // Invoke callback.
2704   GRPC_CLOSURE_RUN(recv_initial_metadata_ready, GRPC_ERROR_REF(error));
2705 }
2706
2707 void CallData::RecvInitialMetadataReady(void* arg, grpc_error* error) {
2708   SubchannelCallBatchData* batch_data =
2709       static_cast<SubchannelCallBatchData*>(arg);
2710   grpc_call_element* elem = batch_data->elem;
2711   ChannelData* chand = static_cast<ChannelData*>(elem->channel_data);
2712   CallData* calld = static_cast<CallData*>(elem->call_data);
2713   if (GRPC_TRACE_FLAG_ENABLED(grpc_client_channel_call_trace)) {
2714     gpr_log(GPR_INFO,
2715             "chand=%p calld=%p: got recv_initial_metadata_ready, error=%s",
2716             chand, calld, grpc_error_string(error));
2717   }
2718   SubchannelCallRetryState* retry_state =
2719       static_cast<SubchannelCallRetryState*>(
2720           batch_data->subchannel_call->GetParentData());
2721   retry_state->completed_recv_initial_metadata = true;
2722   // If a retry was already dispatched, then we're not going to use the
2723   // result of this recv_initial_metadata op, so do nothing.
2724   if (retry_state->retry_dispatched) {
2725     GRPC_CALL_COMBINER_STOP(
2726         calld->call_combiner_,
2727         "recv_initial_metadata_ready after retry dispatched");
2728     return;
2729   }
2730   // If we got an error or a Trailers-Only response and have not yet gotten
2731   // the recv_trailing_metadata_ready callback, then defer propagating this
2732   // callback back to the surface.  We can evaluate whether to retry when
2733   // recv_trailing_metadata comes back.
2734   if (GPR_UNLIKELY((retry_state->trailing_metadata_available ||
2735                     error != GRPC_ERROR_NONE) &&
2736                    !retry_state->completed_recv_trailing_metadata)) {
2737     if (GRPC_TRACE_FLAG_ENABLED(grpc_client_channel_call_trace)) {
2738       gpr_log(GPR_INFO,
2739               "chand=%p calld=%p: deferring recv_initial_metadata_ready "
2740               "(Trailers-Only)",
2741               chand, calld);
2742     }
2743     retry_state->recv_initial_metadata_ready_deferred_batch = batch_data;
2744     retry_state->recv_initial_metadata_error = GRPC_ERROR_REF(error);
2745     if (!retry_state->started_recv_trailing_metadata) {
2746       // recv_trailing_metadata not yet started by application; start it
2747       // ourselves to get status.
2748       calld->StartInternalRecvTrailingMetadata(elem);
2749     } else {
2750       GRPC_CALL_COMBINER_STOP(
2751           calld->call_combiner_,
2752           "recv_initial_metadata_ready trailers-only or error");
2753     }
2754     return;
2755   }
2756   // Received valid initial metadata, so commit the call.
2757   calld->RetryCommit(elem, retry_state);
2758   // Invoke the callback to return the result to the surface.
2759   // Manually invoking a callback function; it does not take ownership of error.
2760   calld->InvokeRecvInitialMetadataCallback(batch_data, error);
2761 }
2762
2763 //
2764 // recv_message callback handling
2765 //
2766
2767 void CallData::InvokeRecvMessageCallback(void* arg, grpc_error* error) {
2768   SubchannelCallBatchData* batch_data =
2769       static_cast<SubchannelCallBatchData*>(arg);
2770   CallData* calld = static_cast<CallData*>(batch_data->elem->call_data);
2771   // Find pending op.
2772   PendingBatch* pending = calld->PendingBatchFind(
2773       batch_data->elem, "invoking recv_message_ready for",
2774       [](grpc_transport_stream_op_batch* batch) {
2775         return batch->recv_message &&
2776                batch->payload->recv_message.recv_message_ready != nullptr;
2777       });
2778   GPR_ASSERT(pending != nullptr);
2779   // Return payload.
2780   SubchannelCallRetryState* retry_state =
2781       static_cast<SubchannelCallRetryState*>(
2782           batch_data->subchannel_call->GetParentData());
2783   *pending->batch->payload->recv_message.recv_message =
2784       std::move(retry_state->recv_message);
2785   // Update bookkeeping.
2786   // Note: Need to do this before invoking the callback, since invoking
2787   // the callback will result in yielding the call combiner.
2788   grpc_closure* recv_message_ready =
2789       pending->batch->payload->recv_message.recv_message_ready;
2790   pending->batch->payload->recv_message.recv_message_ready = nullptr;
2791   calld->MaybeClearPendingBatch(batch_data->elem, pending);
2792   batch_data->Unref();
2793   // Invoke callback.
2794   GRPC_CLOSURE_RUN(recv_message_ready, GRPC_ERROR_REF(error));
2795 }
2796
2797 void CallData::RecvMessageReady(void* arg, grpc_error* error) {
2798   SubchannelCallBatchData* batch_data =
2799       static_cast<SubchannelCallBatchData*>(arg);
2800   grpc_call_element* elem = batch_data->elem;
2801   ChannelData* chand = static_cast<ChannelData*>(elem->channel_data);
2802   CallData* calld = static_cast<CallData*>(elem->call_data);
2803   if (GRPC_TRACE_FLAG_ENABLED(grpc_client_channel_call_trace)) {
2804     gpr_log(GPR_INFO, "chand=%p calld=%p: got recv_message_ready, error=%s",
2805             chand, calld, grpc_error_string(error));
2806   }
2807   SubchannelCallRetryState* retry_state =
2808       static_cast<SubchannelCallRetryState*>(
2809           batch_data->subchannel_call->GetParentData());
2810   ++retry_state->completed_recv_message_count;
2811   // If a retry was already dispatched, then we're not going to use the
2812   // result of this recv_message op, so do nothing.
2813   if (retry_state->retry_dispatched) {
2814     GRPC_CALL_COMBINER_STOP(calld->call_combiner_,
2815                             "recv_message_ready after retry dispatched");
2816     return;
2817   }
2818   // If we got an error or the payload was nullptr and we have not yet gotten
2819   // the recv_trailing_metadata_ready callback, then defer propagating this
2820   // callback back to the surface.  We can evaluate whether to retry when
2821   // recv_trailing_metadata comes back.
2822   if (GPR_UNLIKELY(
2823           (retry_state->recv_message == nullptr || error != GRPC_ERROR_NONE) &&
2824           !retry_state->completed_recv_trailing_metadata)) {
2825     if (GRPC_TRACE_FLAG_ENABLED(grpc_client_channel_call_trace)) {
2826       gpr_log(GPR_INFO,
2827               "chand=%p calld=%p: deferring recv_message_ready (nullptr "
2828               "message and recv_trailing_metadata pending)",
2829               chand, calld);
2830     }
2831     retry_state->recv_message_ready_deferred_batch = batch_data;
2832     retry_state->recv_message_error = GRPC_ERROR_REF(error);
2833     if (!retry_state->started_recv_trailing_metadata) {
2834       // recv_trailing_metadata not yet started by application; start it
2835       // ourselves to get status.
2836       calld->StartInternalRecvTrailingMetadata(elem);
2837     } else {
2838       GRPC_CALL_COMBINER_STOP(calld->call_combiner_, "recv_message_ready null");
2839     }
2840     return;
2841   }
2842   // Received a valid message, so commit the call.
2843   calld->RetryCommit(elem, retry_state);
2844   // Invoke the callback to return the result to the surface.
2845   // Manually invoking a callback function; it does not take ownership of error.
2846   calld->InvokeRecvMessageCallback(batch_data, error);
2847 }
2848
2849 //
2850 // recv_trailing_metadata handling
2851 //
2852
2853 void CallData::GetCallStatus(grpc_call_element* elem,
2854                              grpc_metadata_batch* md_batch, grpc_error* error,
2855                              grpc_status_code* status,
2856                              grpc_mdelem** server_pushback_md) {
2857   if (error != GRPC_ERROR_NONE) {
2858     grpc_error_get_status(error, deadline_, status, nullptr, nullptr, nullptr);
2859   } else {
2860     GPR_ASSERT(md_batch->idx.named.grpc_status != nullptr);
2861     *status =
2862         grpc_get_status_code_from_metadata(md_batch->idx.named.grpc_status->md);
2863     if (server_pushback_md != nullptr &&
2864         md_batch->idx.named.grpc_retry_pushback_ms != nullptr) {
2865       *server_pushback_md = &md_batch->idx.named.grpc_retry_pushback_ms->md;
2866     }
2867   }
2868   GRPC_ERROR_UNREF(error);
2869 }
2870
2871 void CallData::AddClosureForRecvTrailingMetadataReady(
2872     grpc_call_element* elem, SubchannelCallBatchData* batch_data,
2873     grpc_error* error, CallCombinerClosureList* closures) {
2874   // Find pending batch.
2875   PendingBatch* pending = PendingBatchFind(
2876       elem, "invoking recv_trailing_metadata for",
2877       [](grpc_transport_stream_op_batch* batch) {
2878         return batch->recv_trailing_metadata &&
2879                batch->payload->recv_trailing_metadata
2880                        .recv_trailing_metadata_ready != nullptr;
2881       });
2882   // If we generated the recv_trailing_metadata op internally via
2883   // StartInternalRecvTrailingMetadata(), then there will be no pending batch.
2884   if (pending == nullptr) {
2885     GRPC_ERROR_UNREF(error);
2886     return;
2887   }
2888   // Return metadata.
2889   SubchannelCallRetryState* retry_state =
2890       static_cast<SubchannelCallRetryState*>(
2891           batch_data->subchannel_call->GetParentData());
2892   grpc_metadata_batch_move(
2893       &retry_state->recv_trailing_metadata,
2894       pending->batch->payload->recv_trailing_metadata.recv_trailing_metadata);
2895   // Add closure.
2896   closures->Add(pending->batch->payload->recv_trailing_metadata
2897                     .recv_trailing_metadata_ready,
2898                 error, "recv_trailing_metadata_ready for pending batch");
2899   // Update bookkeeping.
2900   pending->batch->payload->recv_trailing_metadata.recv_trailing_metadata_ready =
2901       nullptr;
2902   MaybeClearPendingBatch(elem, pending);
2903 }
2904
2905 void CallData::AddClosuresForDeferredRecvCallbacks(
2906     SubchannelCallBatchData* batch_data, SubchannelCallRetryState* retry_state,
2907     CallCombinerClosureList* closures) {
2908   if (batch_data->batch.recv_trailing_metadata) {
2909     // Add closure for deferred recv_initial_metadata_ready.
2910     if (GPR_UNLIKELY(retry_state->recv_initial_metadata_ready_deferred_batch !=
2911                      nullptr)) {
2912       GRPC_CLOSURE_INIT(&retry_state->recv_initial_metadata_ready,
2913                         InvokeRecvInitialMetadataCallback,
2914                         retry_state->recv_initial_metadata_ready_deferred_batch,
2915                         grpc_schedule_on_exec_ctx);
2916       closures->Add(&retry_state->recv_initial_metadata_ready,
2917                     retry_state->recv_initial_metadata_error,
2918                     "resuming recv_initial_metadata_ready");
2919       retry_state->recv_initial_metadata_ready_deferred_batch = nullptr;
2920     }
2921     // Add closure for deferred recv_message_ready.
2922     if (GPR_UNLIKELY(retry_state->recv_message_ready_deferred_batch !=
2923                      nullptr)) {
2924       GRPC_CLOSURE_INIT(&retry_state->recv_message_ready,
2925                         InvokeRecvMessageCallback,
2926                         retry_state->recv_message_ready_deferred_batch,
2927                         grpc_schedule_on_exec_ctx);
2928       closures->Add(&retry_state->recv_message_ready,
2929                     retry_state->recv_message_error,
2930                     "resuming recv_message_ready");
2931       retry_state->recv_message_ready_deferred_batch = nullptr;
2932     }
2933   }
2934 }
2935
2936 bool CallData::PendingBatchIsUnstarted(PendingBatch* pending,
2937                                        SubchannelCallRetryState* retry_state) {
2938   if (pending->batch == nullptr || pending->batch->on_complete == nullptr) {
2939     return false;
2940   }
2941   if (pending->batch->send_initial_metadata &&
2942       !retry_state->started_send_initial_metadata) {
2943     return true;
2944   }
2945   if (pending->batch->send_message &&
2946       retry_state->started_send_message_count < send_messages_.size()) {
2947     return true;
2948   }
2949   if (pending->batch->send_trailing_metadata &&
2950       !retry_state->started_send_trailing_metadata) {
2951     return true;
2952   }
2953   return false;
2954 }
2955
2956 void CallData::AddClosuresToFailUnstartedPendingBatches(
2957     grpc_call_element* elem, SubchannelCallRetryState* retry_state,
2958     grpc_error* error, CallCombinerClosureList* closures) {
2959   ChannelData* chand = static_cast<ChannelData*>(elem->channel_data);
2960   for (size_t i = 0; i < GPR_ARRAY_SIZE(pending_batches_); ++i) {
2961     PendingBatch* pending = &pending_batches_[i];
2962     if (PendingBatchIsUnstarted(pending, retry_state)) {
2963       if (GRPC_TRACE_FLAG_ENABLED(grpc_client_channel_call_trace)) {
2964         gpr_log(GPR_INFO,
2965                 "chand=%p calld=%p: failing unstarted pending batch at index "
2966                 "%" PRIuPTR,
2967                 chand, this, i);
2968       }
2969       closures->Add(pending->batch->on_complete, GRPC_ERROR_REF(error),
2970                     "failing on_complete for pending batch");
2971       pending->batch->on_complete = nullptr;
2972       MaybeClearPendingBatch(elem, pending);
2973     }
2974   }
2975   GRPC_ERROR_UNREF(error);
2976 }
2977
2978 void CallData::RunClosuresForCompletedCall(SubchannelCallBatchData* batch_data,
2979                                            grpc_error* error) {
2980   grpc_call_element* elem = batch_data->elem;
2981   SubchannelCallRetryState* retry_state =
2982       static_cast<SubchannelCallRetryState*>(
2983           batch_data->subchannel_call->GetParentData());
2984   // Construct list of closures to execute.
2985   CallCombinerClosureList closures;
2986   // First, add closure for recv_trailing_metadata_ready.
2987   AddClosureForRecvTrailingMetadataReady(elem, batch_data,
2988                                          GRPC_ERROR_REF(error), &closures);
2989   // If there are deferred recv_initial_metadata_ready or recv_message_ready
2990   // callbacks, add them to closures.
2991   AddClosuresForDeferredRecvCallbacks(batch_data, retry_state, &closures);
2992   // Add closures to fail any pending batches that have not yet been started.
2993   AddClosuresToFailUnstartedPendingBatches(elem, retry_state,
2994                                            GRPC_ERROR_REF(error), &closures);
2995   // Don't need batch_data anymore.
2996   batch_data->Unref();
2997   // Schedule all of the closures identified above.
2998   // Note: This will release the call combiner.
2999   closures.RunClosures(call_combiner_);
3000   GRPC_ERROR_UNREF(error);
3001 }
3002
3003 void CallData::RecvTrailingMetadataReady(void* arg, grpc_error* error) {
3004   SubchannelCallBatchData* batch_data =
3005       static_cast<SubchannelCallBatchData*>(arg);
3006   grpc_call_element* elem = batch_data->elem;
3007   ChannelData* chand = static_cast<ChannelData*>(elem->channel_data);
3008   CallData* calld = static_cast<CallData*>(elem->call_data);
3009   if (GRPC_TRACE_FLAG_ENABLED(grpc_client_channel_call_trace)) {
3010     gpr_log(GPR_INFO,
3011             "chand=%p calld=%p: got recv_trailing_metadata_ready, error=%s",
3012             chand, calld, grpc_error_string(error));
3013   }
3014   SubchannelCallRetryState* retry_state =
3015       static_cast<SubchannelCallRetryState*>(
3016           batch_data->subchannel_call->GetParentData());
3017   retry_state->completed_recv_trailing_metadata = true;
3018   // Get the call's status and check for server pushback metadata.
3019   grpc_status_code status = GRPC_STATUS_OK;
3020   grpc_mdelem* server_pushback_md = nullptr;
3021   grpc_metadata_batch* md_batch =
3022       batch_data->batch.payload->recv_trailing_metadata.recv_trailing_metadata;
3023   calld->GetCallStatus(elem, md_batch, GRPC_ERROR_REF(error), &status,
3024                        &server_pushback_md);
3025   if (GRPC_TRACE_FLAG_ENABLED(grpc_client_channel_call_trace)) {
3026     gpr_log(GPR_INFO, "chand=%p calld=%p: call finished, status=%s", chand,
3027             calld, grpc_status_code_to_string(status));
3028   }
3029   // Check if we should retry.
3030   if (calld->MaybeRetry(elem, batch_data, status, server_pushback_md)) {
3031     // Unref batch_data for deferred recv_initial_metadata_ready or
3032     // recv_message_ready callbacks, if any.
3033     if (retry_state->recv_initial_metadata_ready_deferred_batch != nullptr) {
3034       batch_data->Unref();
3035       GRPC_ERROR_UNREF(retry_state->recv_initial_metadata_error);
3036     }
3037     if (retry_state->recv_message_ready_deferred_batch != nullptr) {
3038       batch_data->Unref();
3039       GRPC_ERROR_UNREF(retry_state->recv_message_error);
3040     }
3041     batch_data->Unref();
3042     return;
3043   }
3044   // Not retrying, so commit the call.
3045   calld->RetryCommit(elem, retry_state);
3046   // Run any necessary closures.
3047   calld->RunClosuresForCompletedCall(batch_data, GRPC_ERROR_REF(error));
3048 }
3049
3050 //
3051 // on_complete callback handling
3052 //
3053
3054 void CallData::AddClosuresForCompletedPendingBatch(
3055     grpc_call_element* elem, SubchannelCallBatchData* batch_data,
3056     SubchannelCallRetryState* retry_state, grpc_error* error,
3057     CallCombinerClosureList* closures) {
3058   PendingBatch* pending = PendingBatchFind(
3059       elem, "completed", [batch_data](grpc_transport_stream_op_batch* batch) {
3060         // Match the pending batch with the same set of send ops as the
3061         // subchannel batch we've just completed.
3062         return batch->on_complete != nullptr &&
3063                batch_data->batch.send_initial_metadata ==
3064                    batch->send_initial_metadata &&
3065                batch_data->batch.send_message == batch->send_message &&
3066                batch_data->batch.send_trailing_metadata ==
3067                    batch->send_trailing_metadata;
3068       });
3069   // If batch_data is a replay batch, then there will be no pending
3070   // batch to complete.
3071   if (pending == nullptr) {
3072     GRPC_ERROR_UNREF(error);
3073     return;
3074   }
3075   // Add closure.
3076   closures->Add(pending->batch->on_complete, error,
3077                 "on_complete for pending batch");
3078   pending->batch->on_complete = nullptr;
3079   MaybeClearPendingBatch(elem, pending);
3080 }
3081
3082 void CallData::AddClosuresForReplayOrPendingSendOps(
3083     grpc_call_element* elem, SubchannelCallBatchData* batch_data,
3084     SubchannelCallRetryState* retry_state, CallCombinerClosureList* closures) {
3085   ChannelData* chand = static_cast<ChannelData*>(elem->channel_data);
3086   bool have_pending_send_message_ops =
3087       retry_state->started_send_message_count < send_messages_.size();
3088   bool have_pending_send_trailing_metadata_op =
3089       seen_send_trailing_metadata_ &&
3090       !retry_state->started_send_trailing_metadata;
3091   if (!have_pending_send_message_ops &&
3092       !have_pending_send_trailing_metadata_op) {
3093     for (size_t i = 0; i < GPR_ARRAY_SIZE(pending_batches_); ++i) {
3094       PendingBatch* pending = &pending_batches_[i];
3095       grpc_transport_stream_op_batch* batch = pending->batch;
3096       if (batch == nullptr || pending->send_ops_cached) continue;
3097       if (batch->send_message) have_pending_send_message_ops = true;
3098       if (batch->send_trailing_metadata) {
3099         have_pending_send_trailing_metadata_op = true;
3100       }
3101     }
3102   }
3103   if (have_pending_send_message_ops || have_pending_send_trailing_metadata_op) {
3104     if (GRPC_TRACE_FLAG_ENABLED(grpc_client_channel_call_trace)) {
3105       gpr_log(GPR_INFO,
3106               "chand=%p calld=%p: starting next batch for pending send op(s)",
3107               chand, this);
3108     }
3109     GRPC_CLOSURE_INIT(&batch_data->batch.handler_private.closure,
3110                       StartRetriableSubchannelBatches, elem,
3111                       grpc_schedule_on_exec_ctx);
3112     closures->Add(&batch_data->batch.handler_private.closure, GRPC_ERROR_NONE,
3113                   "starting next batch for send_* op(s)");
3114   }
3115 }
3116
3117 void CallData::OnComplete(void* arg, grpc_error* error) {
3118   SubchannelCallBatchData* batch_data =
3119       static_cast<SubchannelCallBatchData*>(arg);
3120   grpc_call_element* elem = batch_data->elem;
3121   ChannelData* chand = static_cast<ChannelData*>(elem->channel_data);
3122   CallData* calld = static_cast<CallData*>(elem->call_data);
3123   if (GRPC_TRACE_FLAG_ENABLED(grpc_client_channel_call_trace)) {
3124     char* batch_str = grpc_transport_stream_op_batch_string(&batch_data->batch);
3125     gpr_log(GPR_INFO, "chand=%p calld=%p: got on_complete, error=%s, batch=%s",
3126             chand, calld, grpc_error_string(error), batch_str);
3127     gpr_free(batch_str);
3128   }
3129   SubchannelCallRetryState* retry_state =
3130       static_cast<SubchannelCallRetryState*>(
3131           batch_data->subchannel_call->GetParentData());
3132   // Update bookkeeping in retry_state.
3133   if (batch_data->batch.send_initial_metadata) {
3134     retry_state->completed_send_initial_metadata = true;
3135   }
3136   if (batch_data->batch.send_message) {
3137     ++retry_state->completed_send_message_count;
3138   }
3139   if (batch_data->batch.send_trailing_metadata) {
3140     retry_state->completed_send_trailing_metadata = true;
3141   }
3142   // If the call is committed, free cached data for send ops that we've just
3143   // completed.
3144   if (calld->retry_committed_) {
3145     calld->FreeCachedSendOpDataForCompletedBatch(elem, batch_data, retry_state);
3146   }
3147   // Construct list of closures to execute.
3148   CallCombinerClosureList closures;
3149   // If a retry was already dispatched, that means we saw
3150   // recv_trailing_metadata before this, so we do nothing here.
3151   // Otherwise, invoke the callback to return the result to the surface.
3152   if (!retry_state->retry_dispatched) {
3153     // Add closure for the completed pending batch, if any.
3154     calld->AddClosuresForCompletedPendingBatch(
3155         elem, batch_data, retry_state, GRPC_ERROR_REF(error), &closures);
3156     // If needed, add a callback to start any replay or pending send ops on
3157     // the subchannel call.
3158     if (!retry_state->completed_recv_trailing_metadata) {
3159       calld->AddClosuresForReplayOrPendingSendOps(elem, batch_data, retry_state,
3160                                                   &closures);
3161     }
3162   }
3163   // Track number of pending subchannel send batches and determine if this
3164   // was the last one.
3165   --calld->num_pending_retriable_subchannel_send_batches_;
3166   const bool last_send_batch_complete =
3167       calld->num_pending_retriable_subchannel_send_batches_ == 0;
3168   // Don't need batch_data anymore.
3169   batch_data->Unref();
3170   // Schedule all of the closures identified above.
3171   // Note: This yeilds the call combiner.
3172   closures.RunClosures(calld->call_combiner_);
3173   // If this was the last subchannel send batch, unref the call stack.
3174   if (last_send_batch_complete) {
3175     GRPC_CALL_STACK_UNREF(calld->owning_call_, "subchannel_send_batches");
3176   }
3177 }
3178
3179 //
3180 // subchannel batch construction
3181 //
3182
3183 void CallData::StartBatchInCallCombiner(void* arg, grpc_error* ignored) {
3184   grpc_transport_stream_op_batch* batch =
3185       static_cast<grpc_transport_stream_op_batch*>(arg);
3186   SubchannelCall* subchannel_call =
3187       static_cast<SubchannelCall*>(batch->handler_private.extra_arg);
3188   // Note: This will release the call combiner.
3189   subchannel_call->StartTransportStreamOpBatch(batch);
3190 }
3191
3192 void CallData::AddClosureForSubchannelBatch(
3193     grpc_call_element* elem, grpc_transport_stream_op_batch* batch,
3194     CallCombinerClosureList* closures) {
3195   ChannelData* chand = static_cast<ChannelData*>(elem->channel_data);
3196   batch->handler_private.extra_arg = subchannel_call_.get();
3197   GRPC_CLOSURE_INIT(&batch->handler_private.closure, StartBatchInCallCombiner,
3198                     batch, grpc_schedule_on_exec_ctx);
3199   if (GRPC_TRACE_FLAG_ENABLED(grpc_client_channel_call_trace)) {
3200     char* batch_str = grpc_transport_stream_op_batch_string(batch);
3201     gpr_log(GPR_INFO, "chand=%p calld=%p: starting subchannel batch: %s", chand,
3202             this, batch_str);
3203     gpr_free(batch_str);
3204   }
3205   closures->Add(&batch->handler_private.closure, GRPC_ERROR_NONE,
3206                 "start_subchannel_batch");
3207 }
3208
3209 void CallData::AddRetriableSendInitialMetadataOp(
3210     SubchannelCallRetryState* retry_state,
3211     SubchannelCallBatchData* batch_data) {
3212   // Maps the number of retries to the corresponding metadata value slice.
3213   const grpc_slice* retry_count_strings[] = {&GRPC_MDSTR_1, &GRPC_MDSTR_2,
3214                                              &GRPC_MDSTR_3, &GRPC_MDSTR_4};
3215   // We need to make a copy of the metadata batch for each attempt, since
3216   // the filters in the subchannel stack may modify this batch, and we don't
3217   // want those modifications to be passed forward to subsequent attempts.
3218   //
3219   // If we've already completed one or more attempts, add the
3220   // grpc-retry-attempts header.
3221   retry_state->send_initial_metadata_storage =
3222       static_cast<grpc_linked_mdelem*>(arena_->Alloc(
3223           sizeof(grpc_linked_mdelem) *
3224           (send_initial_metadata_.list.count + (num_attempts_completed_ > 0))));
3225   grpc_metadata_batch_copy(&send_initial_metadata_,
3226                            &retry_state->send_initial_metadata,
3227                            retry_state->send_initial_metadata_storage);
3228   if (GPR_UNLIKELY(retry_state->send_initial_metadata.idx.named
3229                        .grpc_previous_rpc_attempts != nullptr)) {
3230     grpc_metadata_batch_remove(&retry_state->send_initial_metadata,
3231                                GRPC_BATCH_GRPC_PREVIOUS_RPC_ATTEMPTS);
3232   }
3233   if (GPR_UNLIKELY(num_attempts_completed_ > 0)) {
3234     grpc_mdelem retry_md = grpc_mdelem_create(
3235         GRPC_MDSTR_GRPC_PREVIOUS_RPC_ATTEMPTS,
3236         *retry_count_strings[num_attempts_completed_ - 1], nullptr);
3237     grpc_error* error = grpc_metadata_batch_add_tail(
3238         &retry_state->send_initial_metadata,
3239         &retry_state
3240              ->send_initial_metadata_storage[send_initial_metadata_.list.count],
3241         retry_md, GRPC_BATCH_GRPC_PREVIOUS_RPC_ATTEMPTS);
3242     if (GPR_UNLIKELY(error != GRPC_ERROR_NONE)) {
3243       gpr_log(GPR_ERROR, "error adding retry metadata: %s",
3244               grpc_error_string(error));
3245       GPR_ASSERT(false);
3246     }
3247   }
3248   retry_state->started_send_initial_metadata = true;
3249   batch_data->batch.send_initial_metadata = true;
3250   batch_data->batch.payload->send_initial_metadata.send_initial_metadata =
3251       &retry_state->send_initial_metadata;
3252   batch_data->batch.payload->send_initial_metadata.send_initial_metadata_flags =
3253       send_initial_metadata_flags_;
3254   batch_data->batch.payload->send_initial_metadata.peer_string = peer_string_;
3255 }
3256
3257 void CallData::AddRetriableSendMessageOp(grpc_call_element* elem,
3258                                          SubchannelCallRetryState* retry_state,
3259                                          SubchannelCallBatchData* batch_data) {
3260   ChannelData* chand = static_cast<ChannelData*>(elem->channel_data);
3261   if (GRPC_TRACE_FLAG_ENABLED(grpc_client_channel_call_trace)) {
3262     gpr_log(GPR_INFO,
3263             "chand=%p calld=%p: starting calld->send_messages[%" PRIuPTR "]",
3264             chand, this, retry_state->started_send_message_count);
3265   }
3266   ByteStreamCache* cache =
3267       send_messages_[retry_state->started_send_message_count];
3268   ++retry_state->started_send_message_count;
3269   retry_state->send_message.Init(cache);
3270   batch_data->batch.send_message = true;
3271   batch_data->batch.payload->send_message.send_message.reset(
3272       retry_state->send_message.get());
3273 }
3274
3275 void CallData::AddRetriableSendTrailingMetadataOp(
3276     SubchannelCallRetryState* retry_state,
3277     SubchannelCallBatchData* batch_data) {
3278   // We need to make a copy of the metadata batch for each attempt, since
3279   // the filters in the subchannel stack may modify this batch, and we don't
3280   // want those modifications to be passed forward to subsequent attempts.
3281   retry_state->send_trailing_metadata_storage =
3282       static_cast<grpc_linked_mdelem*>(arena_->Alloc(
3283           sizeof(grpc_linked_mdelem) * send_trailing_metadata_.list.count));
3284   grpc_metadata_batch_copy(&send_trailing_metadata_,
3285                            &retry_state->send_trailing_metadata,
3286                            retry_state->send_trailing_metadata_storage);
3287   retry_state->started_send_trailing_metadata = true;
3288   batch_data->batch.send_trailing_metadata = true;
3289   batch_data->batch.payload->send_trailing_metadata.send_trailing_metadata =
3290       &retry_state->send_trailing_metadata;
3291 }
3292
3293 void CallData::AddRetriableRecvInitialMetadataOp(
3294     SubchannelCallRetryState* retry_state,
3295     SubchannelCallBatchData* batch_data) {
3296   retry_state->started_recv_initial_metadata = true;
3297   batch_data->batch.recv_initial_metadata = true;
3298   grpc_metadata_batch_init(&retry_state->recv_initial_metadata);
3299   batch_data->batch.payload->recv_initial_metadata.recv_initial_metadata =
3300       &retry_state->recv_initial_metadata;
3301   batch_data->batch.payload->recv_initial_metadata.trailing_metadata_available =
3302       &retry_state->trailing_metadata_available;
3303   GRPC_CLOSURE_INIT(&retry_state->recv_initial_metadata_ready,
3304                     RecvInitialMetadataReady, batch_data,
3305                     grpc_schedule_on_exec_ctx);
3306   batch_data->batch.payload->recv_initial_metadata.recv_initial_metadata_ready =
3307       &retry_state->recv_initial_metadata_ready;
3308 }
3309
3310 void CallData::AddRetriableRecvMessageOp(SubchannelCallRetryState* retry_state,
3311                                          SubchannelCallBatchData* batch_data) {
3312   ++retry_state->started_recv_message_count;
3313   batch_data->batch.recv_message = true;
3314   batch_data->batch.payload->recv_message.recv_message =
3315       &retry_state->recv_message;
3316   GRPC_CLOSURE_INIT(&retry_state->recv_message_ready, RecvMessageReady,
3317                     batch_data, grpc_schedule_on_exec_ctx);
3318   batch_data->batch.payload->recv_message.recv_message_ready =
3319       &retry_state->recv_message_ready;
3320 }
3321
3322 void CallData::AddRetriableRecvTrailingMetadataOp(
3323     SubchannelCallRetryState* retry_state,
3324     SubchannelCallBatchData* batch_data) {
3325   retry_state->started_recv_trailing_metadata = true;
3326   batch_data->batch.recv_trailing_metadata = true;
3327   grpc_metadata_batch_init(&retry_state->recv_trailing_metadata);
3328   batch_data->batch.payload->recv_trailing_metadata.recv_trailing_metadata =
3329       &retry_state->recv_trailing_metadata;
3330   batch_data->batch.payload->recv_trailing_metadata.collect_stats =
3331       &retry_state->collect_stats;
3332   GRPC_CLOSURE_INIT(&retry_state->recv_trailing_metadata_ready,
3333                     RecvTrailingMetadataReady, batch_data,
3334                     grpc_schedule_on_exec_ctx);
3335   batch_data->batch.payload->recv_trailing_metadata
3336       .recv_trailing_metadata_ready =
3337       &retry_state->recv_trailing_metadata_ready;
3338   MaybeInjectRecvTrailingMetadataReadyForLoadBalancingPolicy(
3339       &batch_data->batch);
3340 }
3341
3342 void CallData::StartInternalRecvTrailingMetadata(grpc_call_element* elem) {
3343   ChannelData* chand = static_cast<ChannelData*>(elem->channel_data);
3344   if (GRPC_TRACE_FLAG_ENABLED(grpc_client_channel_call_trace)) {
3345     gpr_log(GPR_INFO,
3346             "chand=%p calld=%p: call failed but recv_trailing_metadata not "
3347             "started; starting it internally",
3348             chand, this);
3349   }
3350   SubchannelCallRetryState* retry_state =
3351       static_cast<SubchannelCallRetryState*>(subchannel_call_->GetParentData());
3352   // Create batch_data with 2 refs, since this batch will be unreffed twice:
3353   // once for the recv_trailing_metadata_ready callback when the subchannel
3354   // batch returns, and again when we actually get a recv_trailing_metadata
3355   // op from the surface.
3356   SubchannelCallBatchData* batch_data =
3357       SubchannelCallBatchData::Create(elem, 2, false /* set_on_complete */);
3358   AddRetriableRecvTrailingMetadataOp(retry_state, batch_data);
3359   retry_state->recv_trailing_metadata_internal_batch = batch_data;
3360   // Note: This will release the call combiner.
3361   subchannel_call_->StartTransportStreamOpBatch(&batch_data->batch);
3362 }
3363
3364 // If there are any cached send ops that need to be replayed on the
3365 // current subchannel call, creates and returns a new subchannel batch
3366 // to replay those ops.  Otherwise, returns nullptr.
3367 CallData::SubchannelCallBatchData*
3368 CallData::MaybeCreateSubchannelBatchForReplay(
3369     grpc_call_element* elem, SubchannelCallRetryState* retry_state) {
3370   ChannelData* chand = static_cast<ChannelData*>(elem->channel_data);
3371   SubchannelCallBatchData* replay_batch_data = nullptr;
3372   // send_initial_metadata.
3373   if (seen_send_initial_metadata_ &&
3374       !retry_state->started_send_initial_metadata &&
3375       !pending_send_initial_metadata_) {
3376     if (GRPC_TRACE_FLAG_ENABLED(grpc_client_channel_call_trace)) {
3377       gpr_log(GPR_INFO,
3378               "chand=%p calld=%p: replaying previously completed "
3379               "send_initial_metadata op",
3380               chand, this);
3381     }
3382     replay_batch_data =
3383         SubchannelCallBatchData::Create(elem, 1, true /* set_on_complete */);
3384     AddRetriableSendInitialMetadataOp(retry_state, replay_batch_data);
3385   }
3386   // send_message.
3387   // Note that we can only have one send_message op in flight at a time.
3388   if (retry_state->started_send_message_count < send_messages_.size() &&
3389       retry_state->started_send_message_count ==
3390           retry_state->completed_send_message_count &&
3391       !pending_send_message_) {
3392     if (GRPC_TRACE_FLAG_ENABLED(grpc_client_channel_call_trace)) {
3393       gpr_log(GPR_INFO,
3394               "chand=%p calld=%p: replaying previously completed "
3395               "send_message op",
3396               chand, this);
3397     }
3398     if (replay_batch_data == nullptr) {
3399       replay_batch_data =
3400           SubchannelCallBatchData::Create(elem, 1, true /* set_on_complete */);
3401     }
3402     AddRetriableSendMessageOp(elem, retry_state, replay_batch_data);
3403   }
3404   // send_trailing_metadata.
3405   // Note that we only add this op if we have no more send_message ops
3406   // to start, since we can't send down any more send_message ops after
3407   // send_trailing_metadata.
3408   if (seen_send_trailing_metadata_ &&
3409       retry_state->started_send_message_count == send_messages_.size() &&
3410       !retry_state->started_send_trailing_metadata &&
3411       !pending_send_trailing_metadata_) {
3412     if (GRPC_TRACE_FLAG_ENABLED(grpc_client_channel_call_trace)) {
3413       gpr_log(GPR_INFO,
3414               "chand=%p calld=%p: replaying previously completed "
3415               "send_trailing_metadata op",
3416               chand, this);
3417     }
3418     if (replay_batch_data == nullptr) {
3419       replay_batch_data =
3420           SubchannelCallBatchData::Create(elem, 1, true /* set_on_complete */);
3421     }
3422     AddRetriableSendTrailingMetadataOp(retry_state, replay_batch_data);
3423   }
3424   return replay_batch_data;
3425 }
3426
3427 void CallData::AddSubchannelBatchesForPendingBatches(
3428     grpc_call_element* elem, SubchannelCallRetryState* retry_state,
3429     CallCombinerClosureList* closures) {
3430   for (size_t i = 0; i < GPR_ARRAY_SIZE(pending_batches_); ++i) {
3431     PendingBatch* pending = &pending_batches_[i];
3432     grpc_transport_stream_op_batch* batch = pending->batch;
3433     if (batch == nullptr) continue;
3434     // Skip any batch that either (a) has already been started on this
3435     // subchannel call or (b) we can't start yet because we're still
3436     // replaying send ops that need to be completed first.
3437     // TODO(roth): Note that if any one op in the batch can't be sent
3438     // yet due to ops that we're replaying, we don't start any of the ops
3439     // in the batch.  This is probably okay, but it could conceivably
3440     // lead to increased latency in some cases -- e.g., we could delay
3441     // starting a recv op due to it being in the same batch with a send
3442     // op.  If/when we revamp the callback protocol in
3443     // transport_stream_op_batch, we may be able to fix this.
3444     if (batch->send_initial_metadata &&
3445         retry_state->started_send_initial_metadata) {
3446       continue;
3447     }
3448     if (batch->send_message && retry_state->completed_send_message_count <
3449                                    retry_state->started_send_message_count) {
3450       continue;
3451     }
3452     // Note that we only start send_trailing_metadata if we have no more
3453     // send_message ops to start, since we can't send down any more
3454     // send_message ops after send_trailing_metadata.
3455     if (batch->send_trailing_metadata &&
3456         (retry_state->started_send_message_count + batch->send_message <
3457              send_messages_.size() ||
3458          retry_state->started_send_trailing_metadata)) {
3459       continue;
3460     }
3461     if (batch->recv_initial_metadata &&
3462         retry_state->started_recv_initial_metadata) {
3463       continue;
3464     }
3465     if (batch->recv_message && retry_state->completed_recv_message_count <
3466                                    retry_state->started_recv_message_count) {
3467       continue;
3468     }
3469     if (batch->recv_trailing_metadata &&
3470         retry_state->started_recv_trailing_metadata) {
3471       // If we previously completed a recv_trailing_metadata op
3472       // initiated by StartInternalRecvTrailingMetadata(), use the
3473       // result of that instead of trying to re-start this op.
3474       if (GPR_UNLIKELY((retry_state->recv_trailing_metadata_internal_batch !=
3475                         nullptr))) {
3476         // If the batch completed, then trigger the completion callback
3477         // directly, so that we return the previously returned results to
3478         // the application.  Otherwise, just unref the internally
3479         // started subchannel batch, since we'll propagate the
3480         // completion when it completes.
3481         if (retry_state->completed_recv_trailing_metadata) {
3482           // Batches containing recv_trailing_metadata always succeed.
3483           closures->Add(
3484               &retry_state->recv_trailing_metadata_ready, GRPC_ERROR_NONE,
3485               "re-executing recv_trailing_metadata_ready to propagate "
3486               "internally triggered result");
3487         } else {
3488           retry_state->recv_trailing_metadata_internal_batch->Unref();
3489         }
3490         retry_state->recv_trailing_metadata_internal_batch = nullptr;
3491       }
3492       continue;
3493     }
3494     // If we're not retrying, just send the batch as-is.
3495     if (method_params_ == nullptr ||
3496         method_params_->retry_policy() == nullptr || retry_committed_) {
3497       // TODO(roth) : We should probably call
3498       // MaybeInjectRecvTrailingMetadataReadyForLoadBalancingPolicy here.
3499       AddClosureForSubchannelBatch(elem, batch, closures);
3500       PendingBatchClear(pending);
3501       continue;
3502     }
3503     // Create batch with the right number of callbacks.
3504     const bool has_send_ops = batch->send_initial_metadata ||
3505                               batch->send_message ||
3506                               batch->send_trailing_metadata;
3507     const int num_callbacks = has_send_ops + batch->recv_initial_metadata +
3508                               batch->recv_message +
3509                               batch->recv_trailing_metadata;
3510     SubchannelCallBatchData* batch_data = SubchannelCallBatchData::Create(
3511         elem, num_callbacks, has_send_ops /* set_on_complete */);
3512     // Cache send ops if needed.
3513     MaybeCacheSendOpsForBatch(pending);
3514     // send_initial_metadata.
3515     if (batch->send_initial_metadata) {
3516       AddRetriableSendInitialMetadataOp(retry_state, batch_data);
3517     }
3518     // send_message.
3519     if (batch->send_message) {
3520       AddRetriableSendMessageOp(elem, retry_state, batch_data);
3521     }
3522     // send_trailing_metadata.
3523     if (batch->send_trailing_metadata) {
3524       AddRetriableSendTrailingMetadataOp(retry_state, batch_data);
3525     }
3526     // recv_initial_metadata.
3527     if (batch->recv_initial_metadata) {
3528       // recv_flags is only used on the server side.
3529       GPR_ASSERT(batch->payload->recv_initial_metadata.recv_flags == nullptr);
3530       AddRetriableRecvInitialMetadataOp(retry_state, batch_data);
3531     }
3532     // recv_message.
3533     if (batch->recv_message) {
3534       AddRetriableRecvMessageOp(retry_state, batch_data);
3535     }
3536     // recv_trailing_metadata.
3537     if (batch->recv_trailing_metadata) {
3538       AddRetriableRecvTrailingMetadataOp(retry_state, batch_data);
3539     }
3540     AddClosureForSubchannelBatch(elem, &batch_data->batch, closures);
3541     // Track number of pending subchannel send batches.
3542     // If this is the first one, take a ref to the call stack.
3543     if (batch->send_initial_metadata || batch->send_message ||
3544         batch->send_trailing_metadata) {
3545       if (num_pending_retriable_subchannel_send_batches_ == 0) {
3546         GRPC_CALL_STACK_REF(owning_call_, "subchannel_send_batches");
3547       }
3548       ++num_pending_retriable_subchannel_send_batches_;
3549     }
3550   }
3551 }
3552
3553 void CallData::StartRetriableSubchannelBatches(void* arg, grpc_error* ignored) {
3554   grpc_call_element* elem = static_cast<grpc_call_element*>(arg);
3555   ChannelData* chand = static_cast<ChannelData*>(elem->channel_data);
3556   CallData* calld = static_cast<CallData*>(elem->call_data);
3557   if (GRPC_TRACE_FLAG_ENABLED(grpc_client_channel_call_trace)) {
3558     gpr_log(GPR_INFO, "chand=%p calld=%p: constructing retriable batches",
3559             chand, calld);
3560   }
3561   SubchannelCallRetryState* retry_state =
3562       static_cast<SubchannelCallRetryState*>(
3563           calld->subchannel_call_->GetParentData());
3564   // Construct list of closures to execute, one for each pending batch.
3565   CallCombinerClosureList closures;
3566   // Replay previously-returned send_* ops if needed.
3567   SubchannelCallBatchData* replay_batch_data =
3568       calld->MaybeCreateSubchannelBatchForReplay(elem, retry_state);
3569   if (replay_batch_data != nullptr) {
3570     calld->AddClosureForSubchannelBatch(elem, &replay_batch_data->batch,
3571                                         &closures);
3572     // Track number of pending subchannel send batches.
3573     // If this is the first one, take a ref to the call stack.
3574     if (calld->num_pending_retriable_subchannel_send_batches_ == 0) {
3575       GRPC_CALL_STACK_REF(calld->owning_call_, "subchannel_send_batches");
3576     }
3577     ++calld->num_pending_retriable_subchannel_send_batches_;
3578   }
3579   // Now add pending batches.
3580   calld->AddSubchannelBatchesForPendingBatches(elem, retry_state, &closures);
3581   // Start batches on subchannel call.
3582   if (GRPC_TRACE_FLAG_ENABLED(grpc_client_channel_call_trace)) {
3583     gpr_log(GPR_INFO,
3584             "chand=%p calld=%p: starting %" PRIuPTR
3585             " retriable batches on subchannel_call=%p",
3586             chand, calld, closures.size(), calld->subchannel_call_.get());
3587   }
3588   // Note: This will yield the call combiner.
3589   closures.RunClosures(calld->call_combiner_);
3590 }
3591
3592 //
3593 // LB pick
3594 //
3595
3596 void CallData::CreateSubchannelCall(grpc_call_element* elem) {
3597   ChannelData* chand = static_cast<ChannelData*>(elem->channel_data);
3598   const size_t parent_data_size =
3599       enable_retries_ ? sizeof(SubchannelCallRetryState) : 0;
3600   SubchannelCall::Args call_args = {
3601       std::move(connected_subchannel_), pollent_, path_, call_start_time_,
3602       deadline_, arena_,
3603       // TODO(roth): When we implement hedging support, we will probably
3604       // need to use a separate call context for each subchannel call.
3605       call_context_, call_combiner_, parent_data_size};
3606   grpc_error* error = GRPC_ERROR_NONE;
3607   subchannel_call_ = SubchannelCall::Create(std::move(call_args), &error);
3608   if (GRPC_TRACE_FLAG_ENABLED(grpc_client_channel_routing_trace)) {
3609     gpr_log(GPR_INFO, "chand=%p calld=%p: create subchannel_call=%p: error=%s",
3610             chand, this, subchannel_call_.get(), grpc_error_string(error));
3611   }
3612   if (GPR_UNLIKELY(error != GRPC_ERROR_NONE)) {
3613     PendingBatchesFail(elem, error, YieldCallCombiner);
3614   } else {
3615     if (parent_data_size > 0) {
3616       new (subchannel_call_->GetParentData())
3617           SubchannelCallRetryState(call_context_);
3618     }
3619     PendingBatchesResume(elem);
3620   }
3621 }
3622
3623 void CallData::AsyncPickDone(grpc_call_element* elem, grpc_error* error) {
3624   GRPC_CLOSURE_INIT(&pick_closure_, PickDone, elem, grpc_schedule_on_exec_ctx);
3625   GRPC_CLOSURE_SCHED(&pick_closure_, error);
3626 }
3627
3628 void CallData::PickDone(void* arg, grpc_error* error) {
3629   grpc_call_element* elem = static_cast<grpc_call_element*>(arg);
3630   ChannelData* chand = static_cast<ChannelData*>(elem->channel_data);
3631   CallData* calld = static_cast<CallData*>(elem->call_data);
3632   if (error != GRPC_ERROR_NONE) {
3633     if (GRPC_TRACE_FLAG_ENABLED(grpc_client_channel_routing_trace)) {
3634       gpr_log(GPR_INFO,
3635               "chand=%p calld=%p: failed to pick subchannel: error=%s", chand,
3636               calld, grpc_error_string(error));
3637     }
3638     calld->PendingBatchesFail(elem, GRPC_ERROR_REF(error), YieldCallCombiner);
3639     return;
3640   }
3641   calld->CreateSubchannelCall(elem);
3642 }
3643
3644 // A class to handle the call combiner cancellation callback for a
3645 // queued pick.
3646 class CallData::QueuedPickCanceller {
3647  public:
3648   explicit QueuedPickCanceller(grpc_call_element* elem) : elem_(elem) {
3649     auto* calld = static_cast<CallData*>(elem->call_data);
3650     GRPC_CALL_STACK_REF(calld->owning_call_, "QueuedPickCanceller");
3651     GRPC_CLOSURE_INIT(&closure_, &CancelLocked, this,
3652                       grpc_schedule_on_exec_ctx);
3653     calld->call_combiner_->SetNotifyOnCancel(&closure_);
3654   }
3655
3656  private:
3657   static void CancelLocked(void* arg, grpc_error* error) {
3658     auto* self = static_cast<QueuedPickCanceller*>(arg);
3659     auto* chand = static_cast<ChannelData*>(self->elem_->channel_data);
3660     auto* calld = static_cast<CallData*>(self->elem_->call_data);
3661     MutexLock lock(chand->data_plane_mu());
3662     if (GRPC_TRACE_FLAG_ENABLED(grpc_client_channel_routing_trace)) {
3663       gpr_log(GPR_INFO,
3664               "chand=%p calld=%p: cancelling queued pick: "
3665               "error=%s self=%p calld->pick_canceller=%p",
3666               chand, calld, grpc_error_string(error), self,
3667               calld->pick_canceller_);
3668     }
3669     if (calld->pick_canceller_ == self && error != GRPC_ERROR_NONE) {
3670       // Remove pick from list of queued picks.
3671       calld->RemoveCallFromQueuedPicksLocked(self->elem_);
3672       // Fail pending batches on the call.
3673       calld->PendingBatchesFail(self->elem_, GRPC_ERROR_REF(error),
3674                                 YieldCallCombinerIfPendingBatchesFound);
3675     }
3676     GRPC_CALL_STACK_UNREF(calld->owning_call_, "QueuedPickCanceller");
3677     Delete(self);
3678   }
3679
3680   grpc_call_element* elem_;
3681   grpc_closure closure_;
3682 };
3683
3684 void CallData::RemoveCallFromQueuedPicksLocked(grpc_call_element* elem) {
3685   auto* chand = static_cast<ChannelData*>(elem->channel_data);
3686   if (GRPC_TRACE_FLAG_ENABLED(grpc_client_channel_routing_trace)) {
3687     gpr_log(GPR_INFO, "chand=%p calld=%p: removing from queued picks list",
3688             chand, this);
3689   }
3690   chand->RemoveQueuedPick(&pick_, pollent_);
3691   pick_queued_ = false;
3692   // Lame the call combiner canceller.
3693   pick_canceller_ = nullptr;
3694 }
3695
3696 void CallData::AddCallToQueuedPicksLocked(grpc_call_element* elem) {
3697   auto* chand = static_cast<ChannelData*>(elem->channel_data);
3698   if (GRPC_TRACE_FLAG_ENABLED(grpc_client_channel_routing_trace)) {
3699     gpr_log(GPR_INFO, "chand=%p calld=%p: adding to queued picks list", chand,
3700             this);
3701   }
3702   pick_queued_ = true;
3703   pick_.elem = elem;
3704   chand->AddQueuedPick(&pick_, pollent_);
3705   // Register call combiner cancellation callback.
3706   pick_canceller_ = New<QueuedPickCanceller>(elem);
3707 }
3708
3709 void CallData::ApplyServiceConfigToCallLocked(grpc_call_element* elem) {
3710   ChannelData* chand = static_cast<ChannelData*>(elem->channel_data);
3711   if (GRPC_TRACE_FLAG_ENABLED(grpc_client_channel_routing_trace)) {
3712     gpr_log(GPR_INFO, "chand=%p calld=%p: applying service config to call",
3713             chand, this);
3714   }
3715   // Store a ref to the service_config in service_config_call_data_. Also, save
3716   // a pointer to this in the call_context so that all future filters can access
3717   // it.
3718   service_config_call_data_ =
3719       ServiceConfig::CallData(chand->service_config(), path_);
3720   if (service_config_call_data_.service_config() != nullptr) {
3721     call_context_[GRPC_CONTEXT_SERVICE_CONFIG_CALL_DATA].value =
3722         &service_config_call_data_;
3723     method_params_ = static_cast<ClientChannelMethodParsedConfig*>(
3724         service_config_call_data_.GetMethodParsedConfig(
3725             internal::ClientChannelServiceConfigParser::ParserIndex()));
3726   }
3727   retry_throttle_data_ = chand->retry_throttle_data();
3728   if (method_params_ != nullptr) {
3729     // If the deadline from the service config is shorter than the one
3730     // from the client API, reset the deadline timer.
3731     if (chand->deadline_checking_enabled() && method_params_->timeout() != 0) {
3732       const grpc_millis per_method_deadline =
3733           grpc_cycle_counter_to_millis_round_up(call_start_time_) +
3734           method_params_->timeout();
3735       if (per_method_deadline < deadline_) {
3736         deadline_ = per_method_deadline;
3737         grpc_deadline_state_reset(elem, deadline_);
3738       }
3739     }
3740     // If the service config set wait_for_ready and the application
3741     // did not explicitly set it, use the value from the service config.
3742     uint32_t* send_initial_metadata_flags =
3743         &pending_batches_[0]
3744              .batch->payload->send_initial_metadata.send_initial_metadata_flags;
3745     if (method_params_->wait_for_ready().has_value() &&
3746         !(*send_initial_metadata_flags &
3747           GRPC_INITIAL_METADATA_WAIT_FOR_READY_EXPLICITLY_SET)) {
3748       if (method_params_->wait_for_ready().value()) {
3749         *send_initial_metadata_flags |= GRPC_INITIAL_METADATA_WAIT_FOR_READY;
3750       } else {
3751         *send_initial_metadata_flags &= ~GRPC_INITIAL_METADATA_WAIT_FOR_READY;
3752       }
3753     }
3754   }
3755   // If no retry policy, disable retries.
3756   // TODO(roth): Remove this when adding support for transparent retries.
3757   if (method_params_ == nullptr || method_params_->retry_policy() == nullptr) {
3758     enable_retries_ = false;
3759   }
3760 }
3761
3762 void CallData::MaybeApplyServiceConfigToCallLocked(grpc_call_element* elem) {
3763   ChannelData* chand = static_cast<ChannelData*>(elem->channel_data);
3764   // Apply service config data to the call only once, and only if the
3765   // channel has the data available.
3766   if (GPR_LIKELY(chand->received_service_config_data() &&
3767                  !service_config_applied_)) {
3768     service_config_applied_ = true;
3769     ApplyServiceConfigToCallLocked(elem);
3770   }
3771 }
3772
3773 const char* PickResultTypeName(
3774     LoadBalancingPolicy::PickResult::ResultType type) {
3775   switch (type) {
3776     case LoadBalancingPolicy::PickResult::PICK_COMPLETE:
3777       return "COMPLETE";
3778     case LoadBalancingPolicy::PickResult::PICK_QUEUE:
3779       return "QUEUE";
3780     case LoadBalancingPolicy::PickResult::PICK_FAILED:
3781       return "FAILED";
3782   }
3783   GPR_UNREACHABLE_CODE(return "UNKNOWN");
3784 }
3785
3786 void CallData::PickSubchannel(void* arg, grpc_error* error) {
3787   grpc_call_element* elem = static_cast<grpc_call_element*>(arg);
3788   CallData* calld = static_cast<CallData*>(elem->call_data);
3789   ChannelData* chand = static_cast<ChannelData*>(elem->channel_data);
3790   bool pick_complete;
3791   {
3792     MutexLock lock(chand->data_plane_mu());
3793     pick_complete = calld->PickSubchannelLocked(elem, &error);
3794   }
3795   if (pick_complete) {
3796     PickDone(elem, error);
3797     GRPC_ERROR_UNREF(error);
3798   }
3799 }
3800
3801 bool CallData::PickSubchannelLocked(grpc_call_element* elem,
3802                                     grpc_error** error) {
3803   ChannelData* chand = static_cast<ChannelData*>(elem->channel_data);
3804   GPR_ASSERT(connected_subchannel_ == nullptr);
3805   GPR_ASSERT(subchannel_call_ == nullptr);
3806   // The picker being null means that the channel is currently in IDLE state.
3807   // The incoming call will make the channel exit IDLE.
3808   if (chand->picker() == nullptr) {
3809     // Bounce into the control plane combiner to exit IDLE.
3810     chand->CheckConnectivityState(/*try_to_connect=*/true);
3811     // Queue the pick, so that it will be attempted once the channel
3812     // becomes connected.
3813     AddCallToQueuedPicksLocked(elem);
3814     return false;
3815   }
3816   // Apply service config to call if needed.
3817   MaybeApplyServiceConfigToCallLocked(elem);
3818   // If this is a retry, use the send_initial_metadata payload that
3819   // we've cached; otherwise, use the pending batch.  The
3820   // send_initial_metadata batch will be the first pending batch in the
3821   // list, as set by GetBatchIndex() above.
3822   // TODO(roth): What if the LB policy needs to add something to the
3823   // call's initial metadata, and then there's a retry?  We don't want
3824   // the new metadata to be added twice.  We might need to somehow
3825   // allocate the subchannel batch earlier so that we can give the
3826   // subchannel's copy of the metadata batch (which is copied for each
3827   // attempt) to the LB policy instead the one from the parent channel.
3828   LoadBalancingPolicy::PickArgs pick_args;
3829   pick_args.call_state = &lb_call_state_;
3830   Metadata initial_metadata(
3831       this,
3832       seen_send_initial_metadata_
3833           ? &send_initial_metadata_
3834           : pending_batches_[0]
3835                 .batch->payload->send_initial_metadata.send_initial_metadata);
3836   pick_args.initial_metadata = &initial_metadata;
3837   // Grab initial metadata flags so that we can check later if the call has
3838   // wait_for_ready enabled.
3839   const uint32_t send_initial_metadata_flags =
3840       seen_send_initial_metadata_ ? send_initial_metadata_flags_
3841                                   : pending_batches_[0]
3842                                         .batch->payload->send_initial_metadata
3843                                         .send_initial_metadata_flags;
3844   // Attempt pick.
3845   auto result = chand->picker()->Pick(pick_args);
3846   if (GRPC_TRACE_FLAG_ENABLED(grpc_client_channel_routing_trace)) {
3847     gpr_log(GPR_INFO,
3848             "chand=%p calld=%p: LB pick returned %s (subchannel=%p, error=%s)",
3849             chand, this, PickResultTypeName(result.type),
3850             result.subchannel.get(), grpc_error_string(result.error));
3851   }
3852   switch (result.type) {
3853     case LoadBalancingPolicy::PickResult::PICK_FAILED: {
3854       // If we're shutting down, fail all RPCs.
3855       grpc_error* disconnect_error = chand->disconnect_error();
3856       if (disconnect_error != GRPC_ERROR_NONE) {
3857         GRPC_ERROR_UNREF(result.error);
3858         if (pick_queued_) RemoveCallFromQueuedPicksLocked(elem);
3859         *error = GRPC_ERROR_REF(disconnect_error);
3860         return true;
3861       }
3862       // If wait_for_ready is false, then the error indicates the RPC
3863       // attempt's final status.
3864       if ((send_initial_metadata_flags &
3865            GRPC_INITIAL_METADATA_WAIT_FOR_READY) == 0) {
3866         // Retry if appropriate; otherwise, fail.
3867         grpc_status_code status = GRPC_STATUS_OK;
3868         grpc_error_get_status(result.error, deadline_, &status, nullptr,
3869                               nullptr, nullptr);
3870         const bool retried = enable_retries_ &&
3871                              MaybeRetry(elem, nullptr /* batch_data */, status,
3872                                         nullptr /* server_pushback_md */);
3873         if (!retried) {
3874           grpc_error* new_error =
3875               GRPC_ERROR_CREATE_REFERENCING_FROM_STATIC_STRING(
3876                   "Failed to pick subchannel", &result.error, 1);
3877           GRPC_ERROR_UNREF(result.error);
3878           *error = new_error;
3879         }
3880         if (pick_queued_) RemoveCallFromQueuedPicksLocked(elem);
3881         return !retried;
3882       }
3883       // If wait_for_ready is true, then queue to retry when we get a new
3884       // picker.
3885       GRPC_ERROR_UNREF(result.error);
3886     }
3887     // Fallthrough
3888     case LoadBalancingPolicy::PickResult::PICK_QUEUE:
3889       if (!pick_queued_) AddCallToQueuedPicksLocked(elem);
3890       return false;
3891     default:  // PICK_COMPLETE
3892       if (pick_queued_) RemoveCallFromQueuedPicksLocked(elem);
3893       // Handle drops.
3894       if (GPR_UNLIKELY(result.subchannel == nullptr)) {
3895         result.error = GRPC_ERROR_CREATE_FROM_STATIC_STRING(
3896             "Call dropped by load balancing policy");
3897       } else {
3898         // Grab a ref to the connected subchannel while we're still
3899         // holding the data plane mutex.
3900         connected_subchannel_ =
3901             chand->GetConnectedSubchannelInDataPlane(result.subchannel.get());
3902         GPR_ASSERT(connected_subchannel_ != nullptr);
3903       }
3904       lb_recv_trailing_metadata_ready_ = result.recv_trailing_metadata_ready;
3905       lb_recv_trailing_metadata_ready_user_data_ =
3906           result.recv_trailing_metadata_ready_user_data;
3907       *error = result.error;
3908       return true;
3909   }
3910 }
3911
3912 }  // namespace
3913 }  // namespace grpc_core
3914
3915 /*************************************************************************
3916  * EXPORTED SYMBOLS
3917  */
3918
3919 using grpc_core::CallData;
3920 using grpc_core::ChannelData;
3921
3922 const grpc_channel_filter grpc_client_channel_filter = {
3923     CallData::StartTransportStreamOpBatch,
3924     ChannelData::StartTransportOp,
3925     sizeof(CallData),
3926     CallData::Init,
3927     CallData::SetPollent,
3928     CallData::Destroy,
3929     sizeof(ChannelData),
3930     ChannelData::Init,
3931     ChannelData::Destroy,
3932     ChannelData::GetChannelInfo,
3933     "client-channel",
3934 };
3935
3936 grpc_connectivity_state grpc_client_channel_check_connectivity_state(
3937     grpc_channel_element* elem, int try_to_connect) {
3938   auto* chand = static_cast<ChannelData*>(elem->channel_data);
3939   return chand->CheckConnectivityState(try_to_connect);
3940 }
3941
3942 int grpc_client_channel_num_external_connectivity_watchers(
3943     grpc_channel_element* elem) {
3944   auto* chand = static_cast<ChannelData*>(elem->channel_data);
3945   return chand->NumExternalConnectivityWatchers();
3946 }
3947
3948 void grpc_client_channel_watch_connectivity_state(
3949     grpc_channel_element* elem, grpc_polling_entity pollent,
3950     grpc_connectivity_state* state, grpc_closure* closure,
3951     grpc_closure* watcher_timer_init) {
3952   auto* chand = static_cast<ChannelData*>(elem->channel_data);
3953   return chand->AddExternalConnectivityWatcher(pollent, state, closure,
3954                                                watcher_timer_init);
3955 }
3956
3957 grpc_core::RefCountedPtr<grpc_core::SubchannelCall>
3958 grpc_client_channel_get_subchannel_call(grpc_call_element* elem) {
3959   auto* calld = static_cast<CallData*>(elem->call_data);
3960   return calld->subchannel_call();
3961 }