Built motion from commit 6a09e18b.|2.6.11
[motion2.git] / legacy-libs / grpc / deps / grpc / src / core / ext / filters / client_channel / client_channel.cc
diff --git a/legacy-libs/grpc/deps/grpc/src/core/ext/filters/client_channel/client_channel.cc b/legacy-libs/grpc/deps/grpc/src/core/ext/filters/client_channel/client_channel.cc
new file mode 100644 (file)
index 0000000..0a9b5ac
--- /dev/null
@@ -0,0 +1,3961 @@
+/*
+ *
+ * Copyright 2015 gRPC authors.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ *
+ */
+
+#include <grpc/support/port_platform.h>
+
+#include "src/core/ext/filters/client_channel/client_channel.h"
+
+#include <inttypes.h>
+#include <limits.h>
+#include <stdbool.h>
+#include <stdio.h>
+#include <string.h>
+
+#include <grpc/support/alloc.h>
+#include <grpc/support/log.h>
+#include <grpc/support/string_util.h>
+#include <grpc/support/sync.h>
+
+#include "src/core/ext/filters/client_channel/backend_metric.h"
+#include "src/core/ext/filters/client_channel/backup_poller.h"
+#include "src/core/ext/filters/client_channel/global_subchannel_pool.h"
+#include "src/core/ext/filters/client_channel/http_connect_handshaker.h"
+#include "src/core/ext/filters/client_channel/lb_policy_registry.h"
+#include "src/core/ext/filters/client_channel/local_subchannel_pool.h"
+#include "src/core/ext/filters/client_channel/proxy_mapper_registry.h"
+#include "src/core/ext/filters/client_channel/resolver_registry.h"
+#include "src/core/ext/filters/client_channel/resolver_result_parsing.h"
+#include "src/core/ext/filters/client_channel/resolving_lb_policy.h"
+#include "src/core/ext/filters/client_channel/retry_throttle.h"
+#include "src/core/ext/filters/client_channel/service_config.h"
+#include "src/core/ext/filters/client_channel/subchannel.h"
+#include "src/core/ext/filters/deadline/deadline_filter.h"
+#include "src/core/lib/backoff/backoff.h"
+#include "src/core/lib/channel/channel_args.h"
+#include "src/core/lib/channel/connected_channel.h"
+#include "src/core/lib/channel/status_util.h"
+#include "src/core/lib/gpr/string.h"
+#include "src/core/lib/gprpp/inlined_vector.h"
+#include "src/core/lib/gprpp/manual_constructor.h"
+#include "src/core/lib/gprpp/map.h"
+#include "src/core/lib/gprpp/sync.h"
+#include "src/core/lib/iomgr/combiner.h"
+#include "src/core/lib/iomgr/iomgr.h"
+#include "src/core/lib/iomgr/polling_entity.h"
+#include "src/core/lib/profiling/timers.h"
+#include "src/core/lib/slice/slice_internal.h"
+#include "src/core/lib/slice/slice_string_helpers.h"
+#include "src/core/lib/surface/channel.h"
+#include "src/core/lib/transport/connectivity_state.h"
+#include "src/core/lib/transport/error_utils.h"
+#include "src/core/lib/transport/metadata.h"
+#include "src/core/lib/transport/metadata_batch.h"
+#include "src/core/lib/transport/static_metadata.h"
+#include "src/core/lib/transport/status_metadata.h"
+
+using grpc_core::internal::ClientChannelMethodParsedConfig;
+using grpc_core::internal::ServerRetryThrottleData;
+
+//
+// Client channel filter
+//
+
+// By default, we buffer 256 KiB per RPC for retries.
+// TODO(roth): Do we have any data to suggest a better value?
+#define DEFAULT_PER_RPC_RETRY_BUFFER_SIZE (256 << 10)
+
+// This value was picked arbitrarily.  It can be changed if there is
+// any even moderately compelling reason to do so.
+#define RETRY_BACKOFF_JITTER 0.2
+
+// Max number of batches that can be pending on a call at any given
+// time.  This includes one batch for each of the following ops:
+//   recv_initial_metadata
+//   send_initial_metadata
+//   recv_message
+//   send_message
+//   recv_trailing_metadata
+//   send_trailing_metadata
+#define MAX_PENDING_BATCHES 6
+
+namespace grpc_core {
+
+TraceFlag grpc_client_channel_call_trace(false, "client_channel_call");
+TraceFlag grpc_client_channel_routing_trace(false, "client_channel_routing");
+
+namespace {
+
+//
+// ChannelData definition
+//
+
+class ChannelData {
+ public:
+  struct QueuedPick {
+    grpc_call_element* elem;
+    QueuedPick* next = nullptr;
+  };
+
+  static grpc_error* Init(grpc_channel_element* elem,
+                          grpc_channel_element_args* args);
+  static void Destroy(grpc_channel_element* elem);
+  static void StartTransportOp(grpc_channel_element* elem,
+                               grpc_transport_op* op);
+  static void GetChannelInfo(grpc_channel_element* elem,
+                             const grpc_channel_info* info);
+
+  bool deadline_checking_enabled() const { return deadline_checking_enabled_; }
+  bool enable_retries() const { return enable_retries_; }
+  size_t per_rpc_retry_buffer_size() const {
+    return per_rpc_retry_buffer_size_;
+  }
+
+  // Note: Does NOT return a new ref.
+  grpc_error* disconnect_error() const {
+    return disconnect_error_.Load(MemoryOrder::ACQUIRE);
+  }
+
+  Mutex* data_plane_mu() const { return &data_plane_mu_; }
+
+  LoadBalancingPolicy::SubchannelPicker* picker() const {
+    return picker_.get();
+  }
+  void AddQueuedPick(QueuedPick* pick, grpc_polling_entity* pollent);
+  void RemoveQueuedPick(QueuedPick* to_remove, grpc_polling_entity* pollent);
+
+  bool received_service_config_data() const {
+    return received_service_config_data_;
+  }
+  RefCountedPtr<ServerRetryThrottleData> retry_throttle_data() const {
+    return retry_throttle_data_;
+  }
+  RefCountedPtr<ServiceConfig> service_config() const {
+    return service_config_;
+  }
+
+  RefCountedPtr<ConnectedSubchannel> GetConnectedSubchannelInDataPlane(
+      SubchannelInterface* subchannel) const;
+
+  grpc_connectivity_state CheckConnectivityState(bool try_to_connect);
+  void AddExternalConnectivityWatcher(grpc_polling_entity pollent,
+                                      grpc_connectivity_state* state,
+                                      grpc_closure* on_complete,
+                                      grpc_closure* watcher_timer_init) {
+    // Will delete itself.
+    New<ExternalConnectivityWatcher>(this, pollent, state, on_complete,
+                                     watcher_timer_init);
+  }
+  int NumExternalConnectivityWatchers() const {
+    return external_connectivity_watcher_list_.size();
+  }
+
+ private:
+  class SubchannelWrapper;
+  class ClientChannelControlHelper;
+
+  class ExternalConnectivityWatcher {
+   public:
+    class WatcherList {
+     public:
+      WatcherList() { gpr_mu_init(&mu_); }
+      ~WatcherList() { gpr_mu_destroy(&mu_); }
+
+      int size() const;
+      ExternalConnectivityWatcher* Lookup(grpc_closure* on_complete) const;
+      void Add(ExternalConnectivityWatcher* watcher);
+      void Remove(const ExternalConnectivityWatcher* watcher);
+
+     private:
+      // head_ is guarded by a mutex, since the size() method needs to
+      // iterate over the list, and it's called from the C-core API
+      // function grpc_channel_num_external_connectivity_watchers(), which
+      // is synchronous and therefore cannot run in the combiner.
+      mutable gpr_mu mu_;
+      ExternalConnectivityWatcher* head_ = nullptr;
+    };
+
+    ExternalConnectivityWatcher(ChannelData* chand, grpc_polling_entity pollent,
+                                grpc_connectivity_state* state,
+                                grpc_closure* on_complete,
+                                grpc_closure* watcher_timer_init);
+
+    ~ExternalConnectivityWatcher();
+
+   private:
+    static void OnWatchCompleteLocked(void* arg, grpc_error* error);
+    static void WatchConnectivityStateLocked(void* arg, grpc_error* ignored);
+
+    ChannelData* chand_;
+    grpc_polling_entity pollent_;
+    grpc_connectivity_state* state_;
+    grpc_closure* on_complete_;
+    grpc_closure* watcher_timer_init_;
+    grpc_closure my_closure_;
+    ExternalConnectivityWatcher* next_ = nullptr;
+  };
+
+  ChannelData(grpc_channel_element_args* args, grpc_error** error);
+  ~ChannelData();
+
+  void UpdateStateAndPickerLocked(
+      grpc_connectivity_state state, const char* reason,
+      UniquePtr<LoadBalancingPolicy::SubchannelPicker> picker);
+
+  void UpdateServiceConfigLocked(
+      RefCountedPtr<ServerRetryThrottleData> retry_throttle_data,
+      RefCountedPtr<ServiceConfig> service_config);
+
+  void CreateResolvingLoadBalancingPolicyLocked();
+
+  void DestroyResolvingLoadBalancingPolicyLocked();
+
+  static bool ProcessResolverResultLocked(
+      void* arg, const Resolver::Result& result, const char** lb_policy_name,
+      RefCountedPtr<LoadBalancingPolicy::Config>* lb_policy_config,
+      grpc_error** service_config_error);
+
+  grpc_error* DoPingLocked(grpc_transport_op* op);
+
+  static void StartTransportOpLocked(void* arg, grpc_error* ignored);
+
+  static void TryToConnectLocked(void* arg, grpc_error* error_ignored);
+
+  void ProcessLbPolicy(
+      const Resolver::Result& resolver_result,
+      const internal::ClientChannelGlobalParsedConfig* parsed_service_config,
+      UniquePtr<char>* lb_policy_name,
+      RefCountedPtr<LoadBalancingPolicy::Config>* lb_policy_config);
+
+  //
+  // Fields set at construction and never modified.
+  //
+  const bool deadline_checking_enabled_;
+  const bool enable_retries_;
+  const size_t per_rpc_retry_buffer_size_;
+  grpc_channel_stack* owning_stack_;
+  ClientChannelFactory* client_channel_factory_;
+  const grpc_channel_args* channel_args_;
+  RefCountedPtr<ServiceConfig> default_service_config_;
+  UniquePtr<char> server_name_;
+  UniquePtr<char> target_uri_;
+  channelz::ChannelNode* channelz_node_;
+
+  //
+  // Fields used in the data plane.  Guarded by data_plane_mu.
+  //
+  mutable Mutex data_plane_mu_;
+  UniquePtr<LoadBalancingPolicy::SubchannelPicker> picker_;
+  QueuedPick* queued_picks_ = nullptr;  // Linked list of queued picks.
+  // Data from service config.
+  bool received_service_config_data_ = false;
+  RefCountedPtr<ServerRetryThrottleData> retry_throttle_data_;
+  RefCountedPtr<ServiceConfig> service_config_;
+
+  //
+  // Fields used in the control plane.  Guarded by combiner.
+  //
+  grpc_combiner* combiner_;
+  grpc_pollset_set* interested_parties_;
+  RefCountedPtr<SubchannelPoolInterface> subchannel_pool_;
+  OrphanablePtr<ResolvingLoadBalancingPolicy> resolving_lb_policy_;
+  grpc_connectivity_state_tracker state_tracker_;
+  ExternalConnectivityWatcher::WatcherList external_connectivity_watcher_list_;
+  UniquePtr<char> health_check_service_name_;
+  RefCountedPtr<ServiceConfig> saved_service_config_;
+  bool received_first_resolver_result_ = false;
+  // The number of SubchannelWrapper instances referencing a given Subchannel.
+  Map<Subchannel*, int> subchannel_refcount_map_;
+  // The set of SubchannelWrappers that currently exist.
+  // No need to hold a ref, since the map is updated in the control-plane
+  // combiner when the SubchannelWrappers are created and destroyed.
+  // TODO(roth): We really want to use a set here, not a map.  Since we don't
+  // currently have a set implementation, we use a map and ignore the value.
+  Map<SubchannelWrapper*, bool> subchannel_wrappers_;
+  // Pending ConnectedSubchannel updates for each SubchannelWrapper.
+  // Updates are queued here in the control plane combiner and then applied
+  // in the data plane mutex when the picker is updated.
+  Map<RefCountedPtr<SubchannelWrapper>, RefCountedPtr<ConnectedSubchannel>,
+      RefCountedPtrLess<SubchannelWrapper>>
+      pending_subchannel_updates_;
+
+  //
+  // Fields accessed from both data plane mutex and control plane combiner.
+  //
+  Atomic<grpc_error*> disconnect_error_;
+
+  //
+  // Fields guarded by a mutex, since they need to be accessed
+  // synchronously via get_channel_info().
+  //
+  gpr_mu info_mu_;
+  UniquePtr<char> info_lb_policy_name_;
+  UniquePtr<char> info_service_config_json_;
+};
+
+//
+// CallData definition
+//
+
+class CallData {
+ public:
+  static grpc_error* Init(grpc_call_element* elem,
+                          const grpc_call_element_args* args);
+  static void Destroy(grpc_call_element* elem,
+                      const grpc_call_final_info* final_info,
+                      grpc_closure* then_schedule_closure);
+  static void StartTransportStreamOpBatch(
+      grpc_call_element* elem, grpc_transport_stream_op_batch* batch);
+  static void SetPollent(grpc_call_element* elem, grpc_polling_entity* pollent);
+
+  RefCountedPtr<SubchannelCall> subchannel_call() { return subchannel_call_; }
+
+  // Invoked by channel for queued picks once resolver results are available.
+  void MaybeApplyServiceConfigToCallLocked(grpc_call_element* elem);
+
+  // Invoked by channel for queued picks when the picker is updated.
+  static void PickSubchannel(void* arg, grpc_error* error);
+
+  // Helper function for performing a pick while holding the data plane
+  // mutex.  Returns true if the pick is complete, in which case the caller
+  // must invoke PickDone() or AsyncPickDone() with the returned error.
+  bool PickSubchannelLocked(grpc_call_element* elem, grpc_error** error);
+
+  // Schedules a callback to process the completed pick.  The callback
+  // will not run until after this method returns.
+  void AsyncPickDone(grpc_call_element* elem, grpc_error* error);
+
+ private:
+  class QueuedPickCanceller;
+
+  class Metadata : public LoadBalancingPolicy::MetadataInterface {
+   public:
+    Metadata(CallData* calld, grpc_metadata_batch* batch)
+        : calld_(calld), batch_(batch) {}
+
+    void Add(StringView key, StringView value) override {
+      grpc_linked_mdelem* linked_mdelem = static_cast<grpc_linked_mdelem*>(
+          calld_->arena_->Alloc(sizeof(grpc_linked_mdelem)));
+      linked_mdelem->md = grpc_mdelem_from_slices(
+          grpc_core::ExternallyManagedSlice(key.data(), key.size()),
+          grpc_core::ExternallyManagedSlice(value.data(), value.size()));
+      GPR_ASSERT(grpc_metadata_batch_link_tail(batch_, linked_mdelem) ==
+                 GRPC_ERROR_NONE);
+    }
+
+    Iterator Begin() const override {
+      static_assert(sizeof(grpc_linked_mdelem*) <= sizeof(Iterator),
+                    "iterator size too large");
+      return reinterpret_cast<Iterator>(batch_->list.head);
+    }
+    bool IsEnd(Iterator it) const override {
+      return reinterpret_cast<grpc_linked_mdelem*>(it) == nullptr;
+    }
+    void Next(Iterator* it) const override {
+      *it = reinterpret_cast<Iterator>(
+          reinterpret_cast<grpc_linked_mdelem*>(*it)->next);
+    }
+    StringView Key(Iterator it) const override {
+      return StringView(
+          GRPC_MDKEY(reinterpret_cast<grpc_linked_mdelem*>(it)->md));
+    }
+    StringView Value(Iterator it) const override {
+      return StringView(
+          GRPC_MDVALUE(reinterpret_cast<grpc_linked_mdelem*>(it)->md));
+    }
+
+    void Erase(Iterator* it) override {
+      grpc_linked_mdelem* linked_mdelem =
+          reinterpret_cast<grpc_linked_mdelem*>(*it);
+      *it = reinterpret_cast<Iterator>(linked_mdelem->next);
+      grpc_metadata_batch_remove(batch_, linked_mdelem);
+    }
+
+   private:
+    CallData* calld_;
+    grpc_metadata_batch* batch_;
+  };
+
+  class LbCallState : public LoadBalancingPolicy::CallState {
+   public:
+    explicit LbCallState(CallData* calld) : calld_(calld) {}
+
+    void* Alloc(size_t size) override { return calld_->arena_->Alloc(size); }
+
+    const LoadBalancingPolicy::BackendMetricData* GetBackendMetricData()
+        override {
+      if (calld_->backend_metric_data_ == nullptr) {
+        grpc_linked_mdelem* md = calld_->recv_trailing_metadata_->idx.named
+                                     .x_endpoint_load_metrics_bin;
+        if (md != nullptr) {
+          calld_->backend_metric_data_ =
+              ParseBackendMetricData(GRPC_MDVALUE(md->md), calld_->arena_);
+        }
+      }
+      return calld_->backend_metric_data_;
+    }
+
+   private:
+    CallData* calld_;
+  };
+
+  // State used for starting a retryable batch on a subchannel call.
+  // This provides its own grpc_transport_stream_op_batch and other data
+  // structures needed to populate the ops in the batch.
+  // We allocate one struct on the arena for each attempt at starting a
+  // batch on a given subchannel call.
+  struct SubchannelCallBatchData {
+    // Creates a SubchannelCallBatchData object on the call's arena with the
+    // specified refcount.  If set_on_complete is true, the batch's
+    // on_complete callback will be set to point to on_complete();
+    // otherwise, the batch's on_complete callback will be null.
+    static SubchannelCallBatchData* Create(grpc_call_element* elem,
+                                           int refcount, bool set_on_complete);
+
+    void Unref() {
+      if (gpr_unref(&refs)) Destroy();
+    }
+
+    SubchannelCallBatchData(grpc_call_element* elem, CallData* calld,
+                            int refcount, bool set_on_complete);
+    // All dtor code must be added in `Destroy()`. This is because we may
+    // call closures in `SubchannelCallBatchData` after they are unrefed by
+    // `Unref()`, and msan would complain about accessing this class
+    // after calling dtor. As a result we cannot call the `dtor` in `Unref()`.
+    // TODO(soheil): We should try to call the dtor in `Unref()`.
+    ~SubchannelCallBatchData() { Destroy(); }
+    void Destroy();
+
+    gpr_refcount refs;
+    grpc_call_element* elem;
+    RefCountedPtr<SubchannelCall> subchannel_call;
+    // The batch to use in the subchannel call.
+    // Its payload field points to SubchannelCallRetryState::batch_payload.
+    grpc_transport_stream_op_batch batch;
+    // For intercepting on_complete.
+    grpc_closure on_complete;
+  };
+
+  // Retry state associated with a subchannel call.
+  // Stored in the parent_data of the subchannel call object.
+  struct SubchannelCallRetryState {
+    explicit SubchannelCallRetryState(grpc_call_context_element* context)
+        : batch_payload(context),
+          started_send_initial_metadata(false),
+          completed_send_initial_metadata(false),
+          started_send_trailing_metadata(false),
+          completed_send_trailing_metadata(false),
+          started_recv_initial_metadata(false),
+          completed_recv_initial_metadata(false),
+          started_recv_trailing_metadata(false),
+          completed_recv_trailing_metadata(false),
+          retry_dispatched(false) {}
+
+    // SubchannelCallBatchData.batch.payload points to this.
+    grpc_transport_stream_op_batch_payload batch_payload;
+    // For send_initial_metadata.
+    // Note that we need to make a copy of the initial metadata for each
+    // subchannel call instead of just referring to the copy in call_data,
+    // because filters in the subchannel stack will probably add entries,
+    // so we need to start in a pristine state for each attempt of the call.
+    grpc_linked_mdelem* send_initial_metadata_storage;
+    grpc_metadata_batch send_initial_metadata;
+    // For send_message.
+    // TODO(roth): Restructure this to eliminate use of ManualConstructor.
+    ManualConstructor<ByteStreamCache::CachingByteStream> send_message;
+    // For send_trailing_metadata.
+    grpc_linked_mdelem* send_trailing_metadata_storage;
+    grpc_metadata_batch send_trailing_metadata;
+    // For intercepting recv_initial_metadata.
+    grpc_metadata_batch recv_initial_metadata;
+    grpc_closure recv_initial_metadata_ready;
+    bool trailing_metadata_available = false;
+    // For intercepting recv_message.
+    grpc_closure recv_message_ready;
+    OrphanablePtr<ByteStream> recv_message;
+    // For intercepting recv_trailing_metadata.
+    grpc_metadata_batch recv_trailing_metadata;
+    grpc_transport_stream_stats collect_stats;
+    grpc_closure recv_trailing_metadata_ready;
+    // These fields indicate which ops have been started and completed on
+    // this subchannel call.
+    size_t started_send_message_count = 0;
+    size_t completed_send_message_count = 0;
+    size_t started_recv_message_count = 0;
+    size_t completed_recv_message_count = 0;
+    bool started_send_initial_metadata : 1;
+    bool completed_send_initial_metadata : 1;
+    bool started_send_trailing_metadata : 1;
+    bool completed_send_trailing_metadata : 1;
+    bool started_recv_initial_metadata : 1;
+    bool completed_recv_initial_metadata : 1;
+    bool started_recv_trailing_metadata : 1;
+    bool completed_recv_trailing_metadata : 1;
+    // State for callback processing.
+    SubchannelCallBatchData* recv_initial_metadata_ready_deferred_batch =
+        nullptr;
+    grpc_error* recv_initial_metadata_error = GRPC_ERROR_NONE;
+    SubchannelCallBatchData* recv_message_ready_deferred_batch = nullptr;
+    grpc_error* recv_message_error = GRPC_ERROR_NONE;
+    SubchannelCallBatchData* recv_trailing_metadata_internal_batch = nullptr;
+    // NOTE: Do not move this next to the metadata bitfields above. That would
+    //       save space but will also result in a data race because compiler
+    //       will generate a 2 byte store which overwrites the meta-data
+    //       fields upon setting this field.
+    bool retry_dispatched : 1;
+  };
+
+  // Pending batches stored in call data.
+  struct PendingBatch {
+    // The pending batch.  If nullptr, this slot is empty.
+    grpc_transport_stream_op_batch* batch;
+    // Indicates whether payload for send ops has been cached in CallData.
+    bool send_ops_cached;
+  };
+
+  CallData(grpc_call_element* elem, const ChannelData& chand,
+           const grpc_call_element_args& args);
+  ~CallData();
+
+  // Caches data for send ops so that it can be retried later, if not
+  // already cached.
+  void MaybeCacheSendOpsForBatch(PendingBatch* pending);
+  void FreeCachedSendInitialMetadata(ChannelData* chand);
+  // Frees cached send_message at index idx.
+  void FreeCachedSendMessage(ChannelData* chand, size_t idx);
+  void FreeCachedSendTrailingMetadata(ChannelData* chand);
+  // Frees cached send ops that have already been completed after
+  // committing the call.
+  void FreeCachedSendOpDataAfterCommit(grpc_call_element* elem,
+                                       SubchannelCallRetryState* retry_state);
+  // Frees cached send ops that were completed by the completed batch in
+  // batch_data.  Used when batches are completed after the call is committed.
+  void FreeCachedSendOpDataForCompletedBatch(
+      grpc_call_element* elem, SubchannelCallBatchData* batch_data,
+      SubchannelCallRetryState* retry_state);
+
+  static void RecvTrailingMetadataReadyForLoadBalancingPolicy(
+      void* arg, grpc_error* error);
+  void MaybeInjectRecvTrailingMetadataReadyForLoadBalancingPolicy(
+      grpc_transport_stream_op_batch* batch);
+
+  // Returns the index into pending_batches_ to be used for batch.
+  static size_t GetBatchIndex(grpc_transport_stream_op_batch* batch);
+  void PendingBatchesAdd(grpc_call_element* elem,
+                         grpc_transport_stream_op_batch* batch);
+  void PendingBatchClear(PendingBatch* pending);
+  void MaybeClearPendingBatch(grpc_call_element* elem, PendingBatch* pending);
+  static void FailPendingBatchInCallCombiner(void* arg, grpc_error* error);
+  // A predicate type and some useful implementations for PendingBatchesFail().
+  typedef bool (*YieldCallCombinerPredicate)(
+      const CallCombinerClosureList& closures);
+  static bool YieldCallCombiner(const CallCombinerClosureList& closures) {
+    return true;
+  }
+  static bool NoYieldCallCombiner(const CallCombinerClosureList& closures) {
+    return false;
+  }
+  static bool YieldCallCombinerIfPendingBatchesFound(
+      const CallCombinerClosureList& closures) {
+    return closures.size() > 0;
+  }
+  // Fails all pending batches.
+  // If yield_call_combiner_predicate returns true, assumes responsibility for
+  // yielding the call combiner.
+  void PendingBatchesFail(
+      grpc_call_element* elem, grpc_error* error,
+      YieldCallCombinerPredicate yield_call_combiner_predicate);
+  static void ResumePendingBatchInCallCombiner(void* arg, grpc_error* ignored);
+  // Resumes all pending batches on subchannel_call_.
+  void PendingBatchesResume(grpc_call_element* elem);
+  // Returns a pointer to the first pending batch for which predicate(batch)
+  // returns true, or null if not found.
+  template <typename Predicate>
+  PendingBatch* PendingBatchFind(grpc_call_element* elem,
+                                 const char* log_message, Predicate predicate);
+
+  // Commits the call so that no further retry attempts will be performed.
+  void RetryCommit(grpc_call_element* elem,
+                   SubchannelCallRetryState* retry_state);
+  // Starts a retry after appropriate back-off.
+  void DoRetry(grpc_call_element* elem, SubchannelCallRetryState* retry_state,
+               grpc_millis server_pushback_ms);
+  // Returns true if the call is being retried.
+  bool MaybeRetry(grpc_call_element* elem, SubchannelCallBatchData* batch_data,
+                  grpc_status_code status, grpc_mdelem* server_pushback_md);
+
+  // Invokes recv_initial_metadata_ready for a subchannel batch.
+  static void InvokeRecvInitialMetadataCallback(void* arg, grpc_error* error);
+  // Intercepts recv_initial_metadata_ready callback for retries.
+  // Commits the call and returns the initial metadata up the stack.
+  static void RecvInitialMetadataReady(void* arg, grpc_error* error);
+
+  // Invokes recv_message_ready for a subchannel batch.
+  static void InvokeRecvMessageCallback(void* arg, grpc_error* error);
+  // Intercepts recv_message_ready callback for retries.
+  // Commits the call and returns the message up the stack.
+  static void RecvMessageReady(void* arg, grpc_error* error);
+
+  // Sets *status and *server_pushback_md based on md_batch and error.
+  // Only sets *server_pushback_md if server_pushback_md != nullptr.
+  void GetCallStatus(grpc_call_element* elem, grpc_metadata_batch* md_batch,
+                     grpc_error* error, grpc_status_code* status,
+                     grpc_mdelem** server_pushback_md);
+  // Adds recv_trailing_metadata_ready closure to closures.
+  void AddClosureForRecvTrailingMetadataReady(
+      grpc_call_element* elem, SubchannelCallBatchData* batch_data,
+      grpc_error* error, CallCombinerClosureList* closures);
+  // Adds any necessary closures for deferred recv_initial_metadata and
+  // recv_message callbacks to closures.
+  static void AddClosuresForDeferredRecvCallbacks(
+      SubchannelCallBatchData* batch_data,
+      SubchannelCallRetryState* retry_state, CallCombinerClosureList* closures);
+  // Returns true if any op in the batch was not yet started.
+  // Only looks at send ops, since recv ops are always started immediately.
+  bool PendingBatchIsUnstarted(PendingBatch* pending,
+                               SubchannelCallRetryState* retry_state);
+  // For any pending batch containing an op that has not yet been started,
+  // adds the pending batch's completion closures to closures.
+  void AddClosuresToFailUnstartedPendingBatches(
+      grpc_call_element* elem, SubchannelCallRetryState* retry_state,
+      grpc_error* error, CallCombinerClosureList* closures);
+  // Runs necessary closures upon completion of a call attempt.
+  void RunClosuresForCompletedCall(SubchannelCallBatchData* batch_data,
+                                   grpc_error* error);
+  // Intercepts recv_trailing_metadata_ready callback for retries.
+  // Commits the call and returns the trailing metadata up the stack.
+  static void RecvTrailingMetadataReady(void* arg, grpc_error* error);
+
+  // Adds the on_complete closure for the pending batch completed in
+  // batch_data to closures.
+  void AddClosuresForCompletedPendingBatch(
+      grpc_call_element* elem, SubchannelCallBatchData* batch_data,
+      SubchannelCallRetryState* retry_state, grpc_error* error,
+      CallCombinerClosureList* closures);
+
+  // If there are any cached ops to replay or pending ops to start on the
+  // subchannel call, adds a closure to closures to invoke
+  // StartRetriableSubchannelBatches().
+  void AddClosuresForReplayOrPendingSendOps(
+      grpc_call_element* elem, SubchannelCallBatchData* batch_data,
+      SubchannelCallRetryState* retry_state, CallCombinerClosureList* closures);
+
+  // Callback used to intercept on_complete from subchannel calls.
+  // Called only when retries are enabled.
+  static void OnComplete(void* arg, grpc_error* error);
+
+  static void StartBatchInCallCombiner(void* arg, grpc_error* ignored);
+  // Adds a closure to closures that will execute batch in the call combiner.
+  void AddClosureForSubchannelBatch(grpc_call_element* elem,
+                                    grpc_transport_stream_op_batch* batch,
+                                    CallCombinerClosureList* closures);
+  // Adds retriable send_initial_metadata op to batch_data.
+  void AddRetriableSendInitialMetadataOp(SubchannelCallRetryState* retry_state,
+                                         SubchannelCallBatchData* batch_data);
+  // Adds retriable send_message op to batch_data.
+  void AddRetriableSendMessageOp(grpc_call_element* elem,
+                                 SubchannelCallRetryState* retry_state,
+                                 SubchannelCallBatchData* batch_data);
+  // Adds retriable send_trailing_metadata op to batch_data.
+  void AddRetriableSendTrailingMetadataOp(SubchannelCallRetryState* retry_state,
+                                          SubchannelCallBatchData* batch_data);
+  // Adds retriable recv_initial_metadata op to batch_data.
+  void AddRetriableRecvInitialMetadataOp(SubchannelCallRetryState* retry_state,
+                                         SubchannelCallBatchData* batch_data);
+  // Adds retriable recv_message op to batch_data.
+  void AddRetriableRecvMessageOp(SubchannelCallRetryState* retry_state,
+                                 SubchannelCallBatchData* batch_data);
+  // Adds retriable recv_trailing_metadata op to batch_data.
+  void AddRetriableRecvTrailingMetadataOp(SubchannelCallRetryState* retry_state,
+                                          SubchannelCallBatchData* batch_data);
+  // Helper function used to start a recv_trailing_metadata batch.  This
+  // is used in the case where a recv_initial_metadata or recv_message
+  // op fails in a way that we know the call is over but when the application
+  // has not yet started its own recv_trailing_metadata op.
+  void StartInternalRecvTrailingMetadata(grpc_call_element* elem);
+  // If there are any cached send ops that need to be replayed on the
+  // current subchannel call, creates and returns a new subchannel batch
+  // to replay those ops.  Otherwise, returns nullptr.
+  SubchannelCallBatchData* MaybeCreateSubchannelBatchForReplay(
+      grpc_call_element* elem, SubchannelCallRetryState* retry_state);
+  // Adds subchannel batches for pending batches to closures.
+  void AddSubchannelBatchesForPendingBatches(
+      grpc_call_element* elem, SubchannelCallRetryState* retry_state,
+      CallCombinerClosureList* closures);
+  // Constructs and starts whatever subchannel batches are needed on the
+  // subchannel call.
+  static void StartRetriableSubchannelBatches(void* arg, grpc_error* ignored);
+
+  void CreateSubchannelCall(grpc_call_element* elem);
+  // Invoked when a pick is completed, on both success or failure.
+  static void PickDone(void* arg, grpc_error* error);
+  // Removes the call from the channel's list of queued picks.
+  void RemoveCallFromQueuedPicksLocked(grpc_call_element* elem);
+  // Adds the call to the channel's list of queued picks.
+  void AddCallToQueuedPicksLocked(grpc_call_element* elem);
+  // Applies service config to the call.  Must be invoked once we know
+  // that the resolver has returned results to the channel.
+  void ApplyServiceConfigToCallLocked(grpc_call_element* elem);
+
+  // State for handling deadlines.
+  // The code in deadline_filter.c requires this to be the first field.
+  // TODO(roth): This is slightly sub-optimal in that grpc_deadline_state
+  // and this struct both independently store pointers to the call stack
+  // and call combiner.  If/when we have time, find a way to avoid this
+  // without breaking the grpc_deadline_state abstraction.
+  grpc_deadline_state deadline_state_;
+
+  grpc_slice path_;  // Request path.
+  gpr_cycle_counter call_start_time_;
+  grpc_millis deadline_;
+  Arena* arena_;
+  grpc_call_stack* owning_call_;
+  CallCombiner* call_combiner_;
+  grpc_call_context_element* call_context_;
+
+  RefCountedPtr<ServerRetryThrottleData> retry_throttle_data_;
+  ServiceConfig::CallData service_config_call_data_;
+  const ClientChannelMethodParsedConfig* method_params_ = nullptr;
+
+  RefCountedPtr<SubchannelCall> subchannel_call_;
+
+  // Set when we get a cancel_stream op.
+  grpc_error* cancel_error_ = GRPC_ERROR_NONE;
+
+  ChannelData::QueuedPick pick_;
+  bool pick_queued_ = false;
+  bool service_config_applied_ = false;
+  QueuedPickCanceller* pick_canceller_ = nullptr;
+  LbCallState lb_call_state_;
+  const LoadBalancingPolicy::BackendMetricData* backend_metric_data_ = nullptr;
+  RefCountedPtr<ConnectedSubchannel> connected_subchannel_;
+  void (*lb_recv_trailing_metadata_ready_)(
+      void* user_data, grpc_error* error,
+      LoadBalancingPolicy::MetadataInterface* recv_trailing_metadata,
+      LoadBalancingPolicy::CallState* call_state) = nullptr;
+  void* lb_recv_trailing_metadata_ready_user_data_ = nullptr;
+  grpc_closure pick_closure_;
+
+  // For intercepting recv_trailing_metadata_ready for the LB policy.
+  grpc_metadata_batch* recv_trailing_metadata_ = nullptr;
+  grpc_closure recv_trailing_metadata_ready_;
+  grpc_closure* original_recv_trailing_metadata_ready_ = nullptr;
+
+  grpc_polling_entity* pollent_ = nullptr;
+
+  // Batches are added to this list when received from above.
+  // They are removed when we are done handling the batch (i.e., when
+  // either we have invoked all of the batch's callbacks or we have
+  // passed the batch down to the subchannel call and are not
+  // intercepting any of its callbacks).
+  PendingBatch pending_batches_[MAX_PENDING_BATCHES] = {};
+  bool pending_send_initial_metadata_ : 1;
+  bool pending_send_message_ : 1;
+  bool pending_send_trailing_metadata_ : 1;
+
+  // Retry state.
+  bool enable_retries_ : 1;
+  bool retry_committed_ : 1;
+  bool last_attempt_got_server_pushback_ : 1;
+  int num_attempts_completed_ = 0;
+  size_t bytes_buffered_for_retry_ = 0;
+  // TODO(roth): Restructure this to eliminate use of ManualConstructor.
+  ManualConstructor<BackOff> retry_backoff_;
+  grpc_timer retry_timer_;
+
+  // The number of pending retriable subchannel batches containing send ops.
+  // We hold a ref to the call stack while this is non-zero, since replay
+  // batches may not complete until after all callbacks have been returned
+  // to the surface, and we need to make sure that the call is not destroyed
+  // until all of these batches have completed.
+  // Note that we actually only need to track replay batches, but it's
+  // easier to track all batches with send ops.
+  int num_pending_retriable_subchannel_send_batches_ = 0;
+
+  // Cached data for retrying send ops.
+  // send_initial_metadata
+  bool seen_send_initial_metadata_ = false;
+  grpc_linked_mdelem* send_initial_metadata_storage_ = nullptr;
+  grpc_metadata_batch send_initial_metadata_;
+  uint32_t send_initial_metadata_flags_;
+  gpr_atm* peer_string_;
+  // send_message
+  // When we get a send_message op, we replace the original byte stream
+  // with a CachingByteStream that caches the slices to a local buffer for
+  // use in retries.
+  // Note: We inline the cache for the first 3 send_message ops and use
+  // dynamic allocation after that.  This number was essentially picked
+  // at random; it could be changed in the future to tune performance.
+  InlinedVector<ByteStreamCache*, 3> send_messages_;
+  // send_trailing_metadata
+  bool seen_send_trailing_metadata_ = false;
+  grpc_linked_mdelem* send_trailing_metadata_storage_ = nullptr;
+  grpc_metadata_batch send_trailing_metadata_;
+};
+
+//
+// ChannelData::SubchannelWrapper
+//
+
+// This class is a wrapper for Subchannel that hides details of the
+// channel's implementation (such as the health check service name and
+// connected subchannel) from the LB policy API.
+//
+// Note that no synchronization is needed here, because even if the
+// underlying subchannel is shared between channels, this wrapper will only
+// be used within one channel, so it will always be synchronized by the
+// control plane combiner.
+class ChannelData::SubchannelWrapper : public SubchannelInterface {
+ public:
+  SubchannelWrapper(ChannelData* chand, Subchannel* subchannel,
+                    UniquePtr<char> health_check_service_name)
+      : SubchannelInterface(&grpc_client_channel_routing_trace),
+        chand_(chand),
+        subchannel_(subchannel),
+        health_check_service_name_(std::move(health_check_service_name)) {
+    if (GRPC_TRACE_FLAG_ENABLED(grpc_client_channel_routing_trace)) {
+      gpr_log(GPR_INFO,
+              "chand=%p: creating subchannel wrapper %p for subchannel %p",
+              chand, this, subchannel_);
+    }
+    GRPC_CHANNEL_STACK_REF(chand_->owning_stack_, "SubchannelWrapper");
+    auto* subchannel_node = subchannel_->channelz_node();
+    if (subchannel_node != nullptr) {
+      auto it = chand_->subchannel_refcount_map_.find(subchannel_);
+      if (it == chand_->subchannel_refcount_map_.end()) {
+        chand_->channelz_node_->AddChildSubchannel(subchannel_node->uuid());
+        it = chand_->subchannel_refcount_map_.emplace(subchannel_, 0).first;
+      }
+      ++it->second;
+    }
+    chand_->subchannel_wrappers_[this] = true;
+  }
+
+  ~SubchannelWrapper() {
+    if (GRPC_TRACE_FLAG_ENABLED(grpc_client_channel_routing_trace)) {
+      gpr_log(GPR_INFO,
+              "chand=%p: destroying subchannel wrapper %p for subchannel %p",
+              chand_, this, subchannel_);
+    }
+    chand_->subchannel_wrappers_.erase(this);
+    auto* subchannel_node = subchannel_->channelz_node();
+    if (subchannel_node != nullptr) {
+      auto it = chand_->subchannel_refcount_map_.find(subchannel_);
+      GPR_ASSERT(it != chand_->subchannel_refcount_map_.end());
+      --it->second;
+      if (it->second == 0) {
+        chand_->channelz_node_->RemoveChildSubchannel(subchannel_node->uuid());
+        chand_->subchannel_refcount_map_.erase(it);
+      }
+    }
+    GRPC_SUBCHANNEL_UNREF(subchannel_, "unref from LB");
+    GRPC_CHANNEL_STACK_UNREF(chand_->owning_stack_, "SubchannelWrapper");
+  }
+
+  grpc_connectivity_state CheckConnectivityState() override {
+    RefCountedPtr<ConnectedSubchannel> connected_subchannel;
+    grpc_connectivity_state connectivity_state =
+        subchannel_->CheckConnectivityState(health_check_service_name_.get(),
+                                            &connected_subchannel);
+    MaybeUpdateConnectedSubchannel(std::move(connected_subchannel));
+    return connectivity_state;
+  }
+
+  void WatchConnectivityState(
+      grpc_connectivity_state initial_state,
+      UniquePtr<ConnectivityStateWatcherInterface> watcher) override {
+    auto& watcher_wrapper = watcher_map_[watcher.get()];
+    GPR_ASSERT(watcher_wrapper == nullptr);
+    watcher_wrapper = New<WatcherWrapper>(std::move(watcher),
+                                          Ref(DEBUG_LOCATION, "WatcherWrapper"),
+                                          initial_state);
+    subchannel_->WatchConnectivityState(
+        initial_state,
+        UniquePtr<char>(gpr_strdup(health_check_service_name_.get())),
+        OrphanablePtr<Subchannel::ConnectivityStateWatcherInterface>(
+            watcher_wrapper));
+  }
+
+  void CancelConnectivityStateWatch(
+      ConnectivityStateWatcherInterface* watcher) override {
+    auto it = watcher_map_.find(watcher);
+    GPR_ASSERT(it != watcher_map_.end());
+    subchannel_->CancelConnectivityStateWatch(health_check_service_name_.get(),
+                                              it->second);
+    watcher_map_.erase(it);
+  }
+
+  void AttemptToConnect() override { subchannel_->AttemptToConnect(); }
+
+  void ResetBackoff() override { subchannel_->ResetBackoff(); }
+
+  const grpc_channel_args* channel_args() override {
+    return subchannel_->channel_args();
+  }
+
+  void UpdateHealthCheckServiceName(UniquePtr<char> health_check_service_name) {
+    if (GRPC_TRACE_FLAG_ENABLED(grpc_client_channel_routing_trace)) {
+      gpr_log(GPR_INFO,
+              "chand=%p: subchannel wrapper %p: updating health check service "
+              "name from \"%s\" to \"%s\"",
+              chand_, this, health_check_service_name_.get(),
+              health_check_service_name.get());
+    }
+    for (auto& p : watcher_map_) {
+      WatcherWrapper*& watcher_wrapper = p.second;
+      // Cancel the current watcher and create a new one using the new
+      // health check service name.
+      // TODO(roth): If there is not already an existing health watch
+      // call for the new name, then the watcher will initially report
+      // state CONNECTING.  If the LB policy is currently reporting
+      // state READY, this may cause it to switch to CONNECTING before
+      // switching back to READY.  This could cause a small delay for
+      // RPCs being started on the channel.  If/when this becomes a
+      // problem, we may be able to handle it by waiting for the new
+      // watcher to report READY before we use it to replace the old one.
+      WatcherWrapper* replacement = watcher_wrapper->MakeReplacement();
+      subchannel_->CancelConnectivityStateWatch(
+          health_check_service_name_.get(), watcher_wrapper);
+      watcher_wrapper = replacement;
+      subchannel_->WatchConnectivityState(
+          replacement->last_seen_state(),
+          UniquePtr<char>(gpr_strdup(health_check_service_name.get())),
+          OrphanablePtr<Subchannel::ConnectivityStateWatcherInterface>(
+              replacement));
+    }
+    // Save the new health check service name.
+    health_check_service_name_ = std::move(health_check_service_name);
+  }
+
+  // Caller must be holding the control-plane combiner.
+  ConnectedSubchannel* connected_subchannel() const {
+    return connected_subchannel_.get();
+  }
+
+  // Caller must be holding the data-plane mutex.
+  ConnectedSubchannel* connected_subchannel_in_data_plane() const {
+    return connected_subchannel_in_data_plane_.get();
+  }
+  void set_connected_subchannel_in_data_plane(
+      RefCountedPtr<ConnectedSubchannel> connected_subchannel) {
+    connected_subchannel_in_data_plane_ = std::move(connected_subchannel);
+  }
+
+ private:
+  // Subchannel and SubchannelInterface have different interfaces for
+  // their respective ConnectivityStateWatcherInterface classes.
+  // The one in Subchannel updates the ConnectedSubchannel along with
+  // the state, whereas the one in SubchannelInterface does not expose
+  // the ConnectedSubchannel.
+  //
+  // This wrapper provides a bridge between the two.  It implements
+  // Subchannel::ConnectivityStateWatcherInterface and wraps
+  // the instance of SubchannelInterface::ConnectivityStateWatcherInterface
+  // that was passed in by the LB policy.  We pass an instance of this
+  // class to the underlying Subchannel, and when we get updates from
+  // the subchannel, we pass those on to the wrapped watcher to return
+  // the update to the LB policy.  This allows us to set the connected
+  // subchannel before passing the result back to the LB policy.
+  class WatcherWrapper : public Subchannel::ConnectivityStateWatcherInterface {
+   public:
+    WatcherWrapper(
+        UniquePtr<SubchannelInterface::ConnectivityStateWatcherInterface>
+            watcher,
+        RefCountedPtr<SubchannelWrapper> parent,
+        grpc_connectivity_state initial_state)
+        : watcher_(std::move(watcher)),
+          parent_(std::move(parent)),
+          last_seen_state_(initial_state) {}
+
+    ~WatcherWrapper() { parent_.reset(DEBUG_LOCATION, "WatcherWrapper"); }
+
+    void Orphan() override { Unref(); }
+
+    void OnConnectivityStateChange(
+        grpc_connectivity_state new_state,
+        RefCountedPtr<ConnectedSubchannel> connected_subchannel) override {
+      if (GRPC_TRACE_FLAG_ENABLED(grpc_client_channel_routing_trace)) {
+        gpr_log(GPR_INFO,
+                "chand=%p: connectivity change for subchannel wrapper %p "
+                "subchannel %p (connected_subchannel=%p state=%s); "
+                "hopping into combiner",
+                parent_->chand_, parent_.get(), parent_->subchannel_,
+                connected_subchannel.get(),
+                grpc_connectivity_state_name(new_state));
+      }
+      // Will delete itself.
+      New<Updater>(Ref(), new_state, std::move(connected_subchannel));
+    }
+
+    grpc_pollset_set* interested_parties() override {
+      SubchannelInterface::ConnectivityStateWatcherInterface* watcher =
+          watcher_.get();
+      if (watcher_ == nullptr) watcher = replacement_->watcher_.get();
+      return watcher->interested_parties();
+    }
+
+    WatcherWrapper* MakeReplacement() {
+      auto* replacement =
+          New<WatcherWrapper>(std::move(watcher_), parent_, last_seen_state_);
+      replacement_ = replacement;
+      return replacement;
+    }
+
+    grpc_connectivity_state last_seen_state() const { return last_seen_state_; }
+
+   private:
+    class Updater {
+     public:
+      Updater(RefCountedPtr<WatcherWrapper> parent,
+              grpc_connectivity_state new_state,
+              RefCountedPtr<ConnectedSubchannel> connected_subchannel)
+          : parent_(std::move(parent)),
+            state_(new_state),
+            connected_subchannel_(std::move(connected_subchannel)) {
+        GRPC_CLOSURE_INIT(
+            &closure_, ApplyUpdateInControlPlaneCombiner, this,
+            grpc_combiner_scheduler(parent_->parent_->chand_->combiner_));
+        GRPC_CLOSURE_SCHED(&closure_, GRPC_ERROR_NONE);
+      }
+
+     private:
+      static void ApplyUpdateInControlPlaneCombiner(void* arg,
+                                                    grpc_error* error) {
+        Updater* self = static_cast<Updater*>(arg);
+        if (GRPC_TRACE_FLAG_ENABLED(grpc_client_channel_routing_trace)) {
+          gpr_log(GPR_INFO,
+                  "chand=%p: processing connectivity change in combiner "
+                  "for subchannel wrapper %p subchannel %p "
+                  "(connected_subchannel=%p state=%s): watcher=%p",
+                  self->parent_->parent_->chand_, self->parent_->parent_.get(),
+                  self->parent_->parent_->subchannel_,
+                  self->connected_subchannel_.get(),
+                  grpc_connectivity_state_name(self->state_),
+                  self->parent_->watcher_.get());
+        }
+        // Ignore update if the parent WatcherWrapper has been replaced
+        // since this callback was scheduled.
+        if (self->parent_->watcher_ == nullptr) return;
+        self->parent_->last_seen_state_ = self->state_;
+        self->parent_->parent_->MaybeUpdateConnectedSubchannel(
+            std::move(self->connected_subchannel_));
+        self->parent_->watcher_->OnConnectivityStateChange(self->state_);
+        Delete(self);
+      }
+
+      RefCountedPtr<WatcherWrapper> parent_;
+      grpc_connectivity_state state_;
+      RefCountedPtr<ConnectedSubchannel> connected_subchannel_;
+      grpc_closure closure_;
+    };
+
+    UniquePtr<SubchannelInterface::ConnectivityStateWatcherInterface> watcher_;
+    RefCountedPtr<SubchannelWrapper> parent_;
+    grpc_connectivity_state last_seen_state_;
+    WatcherWrapper* replacement_ = nullptr;
+  };
+
+  void MaybeUpdateConnectedSubchannel(
+      RefCountedPtr<ConnectedSubchannel> connected_subchannel) {
+    // Update the connected subchannel only if the channel is not shutting
+    // down.  This is because once the channel is shutting down, we
+    // ignore picker updates from the LB policy, which means that
+    // UpdateStateAndPickerLocked() will never process the entries
+    // in chand_->pending_subchannel_updates_.  So we don't want to add
+    // entries there that will never be processed, since that would
+    // leave dangling refs to the channel and prevent its destruction.
+    grpc_error* disconnect_error = chand_->disconnect_error();
+    if (disconnect_error != GRPC_ERROR_NONE) return;
+    // Not shutting down, so do the update.
+    if (connected_subchannel_ != connected_subchannel) {
+      connected_subchannel_ = std::move(connected_subchannel);
+      // Record the new connected subchannel so that it can be updated
+      // in the data plane mutex the next time the picker is updated.
+      chand_->pending_subchannel_updates_[Ref(
+          DEBUG_LOCATION, "ConnectedSubchannelUpdate")] = connected_subchannel_;
+    }
+  }
+
+  ChannelData* chand_;
+  Subchannel* subchannel_;
+  UniquePtr<char> health_check_service_name_;
+  // Maps from the address of the watcher passed to us by the LB policy
+  // to the address of the WrapperWatcher that we passed to the underlying
+  // subchannel.  This is needed so that when the LB policy calls
+  // CancelConnectivityStateWatch() with its watcher, we know the
+  // corresponding WrapperWatcher to cancel on the underlying subchannel.
+  Map<ConnectivityStateWatcherInterface*, WatcherWrapper*> watcher_map_;
+  // To be accessed only in the control plane combiner.
+  RefCountedPtr<ConnectedSubchannel> connected_subchannel_;
+  // To be accessed only in the data plane mutex.
+  RefCountedPtr<ConnectedSubchannel> connected_subchannel_in_data_plane_;
+};
+
+//
+// ChannelData::ExternalConnectivityWatcher::WatcherList
+//
+
+int ChannelData::ExternalConnectivityWatcher::WatcherList::size() const {
+  MutexLock lock(&mu_);
+  int count = 0;
+  for (ExternalConnectivityWatcher* w = head_; w != nullptr; w = w->next_) {
+    ++count;
+  }
+  return count;
+}
+
+ChannelData::ExternalConnectivityWatcher*
+ChannelData::ExternalConnectivityWatcher::WatcherList::Lookup(
+    grpc_closure* on_complete) const {
+  MutexLock lock(&mu_);
+  ExternalConnectivityWatcher* w = head_;
+  while (w != nullptr && w->on_complete_ != on_complete) {
+    w = w->next_;
+  }
+  return w;
+}
+
+void ChannelData::ExternalConnectivityWatcher::WatcherList::Add(
+    ExternalConnectivityWatcher* watcher) {
+  GPR_ASSERT(Lookup(watcher->on_complete_) == nullptr);
+  MutexLock lock(&mu_);
+  GPR_ASSERT(watcher->next_ == nullptr);
+  watcher->next_ = head_;
+  head_ = watcher;
+}
+
+void ChannelData::ExternalConnectivityWatcher::WatcherList::Remove(
+    const ExternalConnectivityWatcher* watcher) {
+  MutexLock lock(&mu_);
+  if (watcher == head_) {
+    head_ = watcher->next_;
+    return;
+  }
+  for (ExternalConnectivityWatcher* w = head_; w != nullptr; w = w->next_) {
+    if (w->next_ == watcher) {
+      w->next_ = w->next_->next_;
+      return;
+    }
+  }
+  GPR_UNREACHABLE_CODE(return );
+}
+
+//
+// ChannelData::ExternalConnectivityWatcher
+//
+
+ChannelData::ExternalConnectivityWatcher::ExternalConnectivityWatcher(
+    ChannelData* chand, grpc_polling_entity pollent,
+    grpc_connectivity_state* state, grpc_closure* on_complete,
+    grpc_closure* watcher_timer_init)
+    : chand_(chand),
+      pollent_(pollent),
+      state_(state),
+      on_complete_(on_complete),
+      watcher_timer_init_(watcher_timer_init) {
+  grpc_polling_entity_add_to_pollset_set(&pollent_,
+                                         chand_->interested_parties_);
+  GRPC_CHANNEL_STACK_REF(chand_->owning_stack_, "ExternalConnectivityWatcher");
+  GRPC_CLOSURE_SCHED(
+      GRPC_CLOSURE_INIT(&my_closure_, WatchConnectivityStateLocked, this,
+                        grpc_combiner_scheduler(chand_->combiner_)),
+      GRPC_ERROR_NONE);
+}
+
+ChannelData::ExternalConnectivityWatcher::~ExternalConnectivityWatcher() {
+  grpc_polling_entity_del_from_pollset_set(&pollent_,
+                                           chand_->interested_parties_);
+  GRPC_CHANNEL_STACK_UNREF(chand_->owning_stack_,
+                           "ExternalConnectivityWatcher");
+}
+
+void ChannelData::ExternalConnectivityWatcher::OnWatchCompleteLocked(
+    void* arg, grpc_error* error) {
+  ExternalConnectivityWatcher* self =
+      static_cast<ExternalConnectivityWatcher*>(arg);
+  grpc_closure* on_complete = self->on_complete_;
+  self->chand_->external_connectivity_watcher_list_.Remove(self);
+  Delete(self);
+  GRPC_CLOSURE_SCHED(on_complete, GRPC_ERROR_REF(error));
+}
+
+void ChannelData::ExternalConnectivityWatcher::WatchConnectivityStateLocked(
+    void* arg, grpc_error* ignored) {
+  ExternalConnectivityWatcher* self =
+      static_cast<ExternalConnectivityWatcher*>(arg);
+  if (self->state_ == nullptr) {
+    // Handle cancellation.
+    GPR_ASSERT(self->watcher_timer_init_ == nullptr);
+    ExternalConnectivityWatcher* found =
+        self->chand_->external_connectivity_watcher_list_.Lookup(
+            self->on_complete_);
+    if (found != nullptr) {
+      grpc_connectivity_state_notify_on_state_change(
+          &found->chand_->state_tracker_, nullptr, &found->my_closure_);
+    }
+    Delete(self);
+    return;
+  }
+  // New watcher.
+  self->chand_->external_connectivity_watcher_list_.Add(self);
+  // This assumes that the closure is scheduled on the ExecCtx scheduler
+  // and that GRPC_CLOSURE_RUN would run the closure immediately.
+  GRPC_CLOSURE_RUN(self->watcher_timer_init_, GRPC_ERROR_NONE);
+  GRPC_CLOSURE_INIT(&self->my_closure_, OnWatchCompleteLocked, self,
+                    grpc_combiner_scheduler(self->chand_->combiner_));
+  grpc_connectivity_state_notify_on_state_change(
+      &self->chand_->state_tracker_, self->state_, &self->my_closure_);
+}
+
+//
+// ChannelData::ClientChannelControlHelper
+//
+
+class ChannelData::ClientChannelControlHelper
+    : public LoadBalancingPolicy::ChannelControlHelper {
+ public:
+  explicit ClientChannelControlHelper(ChannelData* chand) : chand_(chand) {
+    GRPC_CHANNEL_STACK_REF(chand_->owning_stack_, "ClientChannelControlHelper");
+  }
+
+  ~ClientChannelControlHelper() override {
+    GRPC_CHANNEL_STACK_UNREF(chand_->owning_stack_,
+                             "ClientChannelControlHelper");
+  }
+
+  RefCountedPtr<SubchannelInterface> CreateSubchannel(
+      const grpc_channel_args& args) override {
+    bool inhibit_health_checking = grpc_channel_arg_get_bool(
+        grpc_channel_args_find(&args, GRPC_ARG_INHIBIT_HEALTH_CHECKING), false);
+    UniquePtr<char> health_check_service_name;
+    if (!inhibit_health_checking) {
+      health_check_service_name.reset(
+          gpr_strdup(chand_->health_check_service_name_.get()));
+    }
+    static const char* args_to_remove[] = {
+        GRPC_ARG_INHIBIT_HEALTH_CHECKING,
+        GRPC_ARG_CHANNELZ_CHANNEL_NODE,
+    };
+    grpc_arg arg = SubchannelPoolInterface::CreateChannelArg(
+        chand_->subchannel_pool_.get());
+    grpc_channel_args* new_args = grpc_channel_args_copy_and_add_and_remove(
+        &args, args_to_remove, GPR_ARRAY_SIZE(args_to_remove), &arg, 1);
+    Subchannel* subchannel =
+        chand_->client_channel_factory_->CreateSubchannel(new_args);
+    grpc_channel_args_destroy(new_args);
+    if (subchannel == nullptr) return nullptr;
+    return MakeRefCounted<SubchannelWrapper>(
+        chand_, subchannel, std::move(health_check_service_name));
+  }
+
+  void UpdateState(
+      grpc_connectivity_state state,
+      UniquePtr<LoadBalancingPolicy::SubchannelPicker> picker) override {
+    grpc_error* disconnect_error = chand_->disconnect_error();
+    if (GRPC_TRACE_FLAG_ENABLED(grpc_client_channel_routing_trace)) {
+      const char* extra = disconnect_error == GRPC_ERROR_NONE
+                              ? ""
+                              : " (ignoring -- channel shutting down)";
+      gpr_log(GPR_INFO, "chand=%p: update: state=%s picker=%p%s", chand_,
+              grpc_connectivity_state_name(state), picker.get(), extra);
+    }
+    // Do update only if not shutting down.
+    if (disconnect_error == GRPC_ERROR_NONE) {
+      chand_->UpdateStateAndPickerLocked(state, "helper", std::move(picker));
+    }
+  }
+
+  // No-op -- we should never get this from ResolvingLoadBalancingPolicy.
+  void RequestReresolution() override {}
+
+  void AddTraceEvent(TraceSeverity severity, StringView message) override {
+    if (chand_->channelz_node_ != nullptr) {
+      chand_->channelz_node_->AddTraceEvent(
+          ConvertSeverityEnum(severity),
+          grpc_slice_from_copied_buffer(message.data(), message.size()));
+    }
+  }
+
+ private:
+  static channelz::ChannelTrace::Severity ConvertSeverityEnum(
+      TraceSeverity severity) {
+    if (severity == TRACE_INFO) return channelz::ChannelTrace::Info;
+    if (severity == TRACE_WARNING) return channelz::ChannelTrace::Warning;
+    return channelz::ChannelTrace::Error;
+  }
+
+  ChannelData* chand_;
+};
+
+//
+// ChannelData implementation
+//
+
+grpc_error* ChannelData::Init(grpc_channel_element* elem,
+                              grpc_channel_element_args* args) {
+  GPR_ASSERT(args->is_last);
+  GPR_ASSERT(elem->filter == &grpc_client_channel_filter);
+  grpc_error* error = GRPC_ERROR_NONE;
+  new (elem->channel_data) ChannelData(args, &error);
+  return error;
+}
+
+void ChannelData::Destroy(grpc_channel_element* elem) {
+  ChannelData* chand = static_cast<ChannelData*>(elem->channel_data);
+  chand->~ChannelData();
+}
+
+bool GetEnableRetries(const grpc_channel_args* args) {
+  return grpc_channel_arg_get_bool(
+      grpc_channel_args_find(args, GRPC_ARG_ENABLE_RETRIES), true);
+}
+
+size_t GetMaxPerRpcRetryBufferSize(const grpc_channel_args* args) {
+  return static_cast<size_t>(grpc_channel_arg_get_integer(
+      grpc_channel_args_find(args, GRPC_ARG_PER_RPC_RETRY_BUFFER_SIZE),
+      {DEFAULT_PER_RPC_RETRY_BUFFER_SIZE, 0, INT_MAX}));
+}
+
+RefCountedPtr<SubchannelPoolInterface> GetSubchannelPool(
+    const grpc_channel_args* args) {
+  const bool use_local_subchannel_pool = grpc_channel_arg_get_bool(
+      grpc_channel_args_find(args, GRPC_ARG_USE_LOCAL_SUBCHANNEL_POOL), false);
+  if (use_local_subchannel_pool) {
+    return MakeRefCounted<LocalSubchannelPool>();
+  }
+  return GlobalSubchannelPool::instance();
+}
+
+channelz::ChannelNode* GetChannelzNode(const grpc_channel_args* args) {
+  const grpc_arg* arg =
+      grpc_channel_args_find(args, GRPC_ARG_CHANNELZ_CHANNEL_NODE);
+  if (arg != nullptr && arg->type == GRPC_ARG_POINTER) {
+    return static_cast<channelz::ChannelNode*>(arg->value.pointer.p);
+  }
+  return nullptr;
+}
+
+ChannelData::ChannelData(grpc_channel_element_args* args, grpc_error** error)
+    : deadline_checking_enabled_(
+          grpc_deadline_checking_enabled(args->channel_args)),
+      enable_retries_(GetEnableRetries(args->channel_args)),
+      per_rpc_retry_buffer_size_(
+          GetMaxPerRpcRetryBufferSize(args->channel_args)),
+      owning_stack_(args->channel_stack),
+      client_channel_factory_(
+          ClientChannelFactory::GetFromChannelArgs(args->channel_args)),
+      channelz_node_(GetChannelzNode(args->channel_args)),
+      combiner_(grpc_combiner_create()),
+      interested_parties_(grpc_pollset_set_create()),
+      subchannel_pool_(GetSubchannelPool(args->channel_args)),
+      disconnect_error_(GRPC_ERROR_NONE) {
+  if (GRPC_TRACE_FLAG_ENABLED(grpc_client_channel_routing_trace)) {
+    gpr_log(GPR_INFO, "chand=%p: creating client_channel for channel stack %p",
+            this, owning_stack_);
+  }
+  // Initialize data members.
+  grpc_connectivity_state_init(&state_tracker_, GRPC_CHANNEL_IDLE,
+                               "client_channel");
+  gpr_mu_init(&info_mu_);
+  // Start backup polling.
+  grpc_client_channel_start_backup_polling(interested_parties_);
+  // Check client channel factory.
+  if (client_channel_factory_ == nullptr) {
+    *error = GRPC_ERROR_CREATE_FROM_STATIC_STRING(
+        "Missing client channel factory in args for client channel filter");
+    return;
+  }
+  // Get server name to resolve, using proxy mapper if needed.
+  const char* server_uri = grpc_channel_arg_get_string(
+      grpc_channel_args_find(args->channel_args, GRPC_ARG_SERVER_URI));
+  if (server_uri == nullptr) {
+    *error = GRPC_ERROR_CREATE_FROM_STATIC_STRING(
+        "server URI channel arg missing or wrong type in client channel "
+        "filter");
+    return;
+  }
+  // Get default service config
+  const char* service_config_json = grpc_channel_arg_get_string(
+      grpc_channel_args_find(args->channel_args, GRPC_ARG_SERVICE_CONFIG));
+  if (service_config_json != nullptr) {
+    *error = GRPC_ERROR_NONE;
+    default_service_config_ = ServiceConfig::Create(service_config_json, error);
+    if (*error != GRPC_ERROR_NONE) {
+      default_service_config_.reset();
+      return;
+    }
+  }
+  grpc_uri* uri = grpc_uri_parse(server_uri, true);
+  if (uri != nullptr && uri->path[0] != '\0') {
+    server_name_.reset(
+        gpr_strdup(uri->path[0] == '/' ? uri->path + 1 : uri->path));
+  }
+  grpc_uri_destroy(uri);
+  char* proxy_name = nullptr;
+  grpc_channel_args* new_args = nullptr;
+  grpc_proxy_mappers_map_name(server_uri, args->channel_args, &proxy_name,
+                              &new_args);
+  target_uri_.reset(proxy_name != nullptr ? proxy_name
+                                          : gpr_strdup(server_uri));
+  channel_args_ = new_args != nullptr
+                      ? new_args
+                      : grpc_channel_args_copy(args->channel_args);
+  if (!ResolverRegistry::IsValidTarget(target_uri_.get())) {
+    *error =
+        GRPC_ERROR_CREATE_FROM_STATIC_STRING("the target uri is not valid.");
+    return;
+  }
+  *error = GRPC_ERROR_NONE;
+}
+
+ChannelData::~ChannelData() {
+  if (GRPC_TRACE_FLAG_ENABLED(grpc_client_channel_routing_trace)) {
+    gpr_log(GPR_INFO, "chand=%p: destroying channel", this);
+  }
+  DestroyResolvingLoadBalancingPolicyLocked();
+  grpc_channel_args_destroy(channel_args_);
+  // Stop backup polling.
+  grpc_client_channel_stop_backup_polling(interested_parties_);
+  grpc_pollset_set_destroy(interested_parties_);
+  GRPC_COMBINER_UNREF(combiner_, "client_channel");
+  GRPC_ERROR_UNREF(disconnect_error_.Load(MemoryOrder::RELAXED));
+  grpc_connectivity_state_destroy(&state_tracker_);
+  gpr_mu_destroy(&info_mu_);
+}
+
+void ChannelData::UpdateStateAndPickerLocked(
+    grpc_connectivity_state state, const char* reason,
+    UniquePtr<LoadBalancingPolicy::SubchannelPicker> picker) {
+  // Clean the control plane when entering IDLE.
+  if (picker_ == nullptr) {
+    health_check_service_name_.reset();
+    saved_service_config_.reset();
+    received_first_resolver_result_ = false;
+  }
+  // Update connectivity state.
+  grpc_connectivity_state_set(&state_tracker_, state, reason);
+  if (channelz_node_ != nullptr) {
+    channelz_node_->SetConnectivityState(state);
+    channelz_node_->AddTraceEvent(
+        channelz::ChannelTrace::Severity::Info,
+        grpc_slice_from_static_string(
+            channelz::ChannelNode::GetChannelConnectivityStateChangeString(
+                state)));
+  }
+  // Grab data plane lock to do subchannel updates and update the picker.
+  //
+  // Note that we want to minimize the work done while holding the data
+  // plane lock, to keep the critical section small.  So, for all of the
+  // objects that we might wind up unreffing here, we actually hold onto
+  // the refs until after we release the lock, and then unref them at
+  // that point.  This includes the following:
+  // - refs to subchannel wrappers in the keys of pending_subchannel_updates_
+  // - ref stored in retry_throttle_data_
+  // - ref stored in service_config_
+  // - ownership of the existing picker in picker_
+  RefCountedPtr<ServerRetryThrottleData> retry_throttle_data_to_unref;
+  RefCountedPtr<ServiceConfig> service_config_to_unref;
+  {
+    MutexLock lock(&data_plane_mu_);
+    // Handle subchannel updates.
+    for (auto& p : pending_subchannel_updates_) {
+      if (GRPC_TRACE_FLAG_ENABLED(grpc_client_channel_routing_trace)) {
+        gpr_log(GPR_INFO,
+                "chand=%p: updating subchannel wrapper %p data plane "
+                "connected_subchannel to %p",
+                this, p.first.get(), p.second.get());
+      }
+      // Note: We do not remove the entry from pending_subchannel_updates_
+      // here, since this would unref the subchannel wrapper; instead,
+      // we wait until we've released the lock to clear the map.
+      p.first->set_connected_subchannel_in_data_plane(std::move(p.second));
+    }
+    // Swap out the picker.
+    // Note: Original value will be destroyed after the lock is released.
+    picker_.swap(picker);
+    // Clean the data plane if the updated picker is nullptr.
+    if (picker_ == nullptr) {
+      received_service_config_data_ = false;
+      // Note: We save the objects to unref until after the lock is released.
+      retry_throttle_data_to_unref = std::move(retry_throttle_data_);
+      service_config_to_unref = std::move(service_config_);
+    }
+    // Re-process queued picks.
+    for (QueuedPick* pick = queued_picks_; pick != nullptr; pick = pick->next) {
+      grpc_call_element* elem = pick->elem;
+      CallData* calld = static_cast<CallData*>(elem->call_data);
+      grpc_error* error = GRPC_ERROR_NONE;
+      if (calld->PickSubchannelLocked(elem, &error)) {
+        calld->AsyncPickDone(elem, error);
+      }
+    }
+  }
+  // Clear the pending update map after releasing the lock, to keep the
+  // critical section small.
+  pending_subchannel_updates_.clear();
+}
+
+void ChannelData::UpdateServiceConfigLocked(
+    RefCountedPtr<ServerRetryThrottleData> retry_throttle_data,
+    RefCountedPtr<ServiceConfig> service_config) {
+  // Grab data plane lock to update service config.
+  //
+  // We defer unreffing the old values (and deallocating memory) until
+  // after releasing the lock to keep the critical section small.
+  {
+    MutexLock lock(&data_plane_mu_);
+    // Update service config.
+    received_service_config_data_ = true;
+    // Old values will be unreffed after lock is released.
+    retry_throttle_data_.swap(retry_throttle_data);
+    service_config_.swap(service_config);
+    // Apply service config to queued picks.
+    for (QueuedPick* pick = queued_picks_; pick != nullptr; pick = pick->next) {
+      CallData* calld = static_cast<CallData*>(pick->elem->call_data);
+      calld->MaybeApplyServiceConfigToCallLocked(pick->elem);
+    }
+  }
+  // Old values will be unreffed after lock is released when they go out
+  // of scope.
+}
+
+void ChannelData::CreateResolvingLoadBalancingPolicyLocked() {
+  // Instantiate resolving LB policy.
+  LoadBalancingPolicy::Args lb_args;
+  lb_args.combiner = combiner_;
+  lb_args.channel_control_helper =
+      UniquePtr<LoadBalancingPolicy::ChannelControlHelper>(
+          New<ClientChannelControlHelper>(this));
+  lb_args.args = channel_args_;
+  UniquePtr<char> target_uri(gpr_strdup(target_uri_.get()));
+  resolving_lb_policy_.reset(New<ResolvingLoadBalancingPolicy>(
+      std::move(lb_args), &grpc_client_channel_routing_trace,
+      std::move(target_uri), ProcessResolverResultLocked, this));
+  grpc_pollset_set_add_pollset_set(resolving_lb_policy_->interested_parties(),
+                                   interested_parties_);
+  if (GRPC_TRACE_FLAG_ENABLED(grpc_client_channel_routing_trace)) {
+    gpr_log(GPR_INFO, "chand=%p: created resolving_lb_policy=%p", this,
+            resolving_lb_policy_.get());
+  }
+}
+
+void ChannelData::DestroyResolvingLoadBalancingPolicyLocked() {
+  if (resolving_lb_policy_ != nullptr) {
+    grpc_pollset_set_del_pollset_set(resolving_lb_policy_->interested_parties(),
+                                     interested_parties_);
+    resolving_lb_policy_.reset();
+  }
+}
+
+void ChannelData::ProcessLbPolicy(
+    const Resolver::Result& resolver_result,
+    const internal::ClientChannelGlobalParsedConfig* parsed_service_config,
+    UniquePtr<char>* lb_policy_name,
+    RefCountedPtr<LoadBalancingPolicy::Config>* lb_policy_config) {
+  // Prefer the LB policy name found in the service config.
+  if (parsed_service_config != nullptr &&
+      parsed_service_config->parsed_lb_config() != nullptr) {
+    lb_policy_name->reset(
+        gpr_strdup(parsed_service_config->parsed_lb_config()->name()));
+    *lb_policy_config = parsed_service_config->parsed_lb_config();
+    return;
+  }
+  const char* local_policy_name = nullptr;
+  if (parsed_service_config != nullptr &&
+      parsed_service_config->parsed_deprecated_lb_policy() != nullptr) {
+    local_policy_name = parsed_service_config->parsed_deprecated_lb_policy();
+  } else {
+    const grpc_arg* channel_arg =
+        grpc_channel_args_find(resolver_result.args, GRPC_ARG_LB_POLICY_NAME);
+    local_policy_name = grpc_channel_arg_get_string(channel_arg);
+  }
+  // Special case: If at least one balancer address is present, we use
+  // the grpclb policy, regardless of what the resolver has returned.
+  bool found_balancer_address = false;
+  for (size_t i = 0; i < resolver_result.addresses.size(); ++i) {
+    const ServerAddress& address = resolver_result.addresses[i];
+    if (address.IsBalancer()) {
+      found_balancer_address = true;
+      break;
+    }
+  }
+  if (found_balancer_address) {
+    if (local_policy_name != nullptr &&
+        strcmp(local_policy_name, "grpclb") != 0) {
+      gpr_log(GPR_INFO,
+              "resolver requested LB policy %s but provided at least one "
+              "balancer address -- forcing use of grpclb LB policy",
+              local_policy_name);
+    }
+    local_policy_name = "grpclb";
+  }
+  // Use pick_first if nothing was specified and we didn't select grpclb
+  // above.
+  lb_policy_name->reset(gpr_strdup(
+      local_policy_name == nullptr ? "pick_first" : local_policy_name));
+}
+
+// Synchronous callback from ResolvingLoadBalancingPolicy to process a
+// resolver result update.
+bool ChannelData::ProcessResolverResultLocked(
+    void* arg, const Resolver::Result& result, const char** lb_policy_name,
+    RefCountedPtr<LoadBalancingPolicy::Config>* lb_policy_config,
+    grpc_error** service_config_error) {
+  ChannelData* chand = static_cast<ChannelData*>(arg);
+  RefCountedPtr<ServiceConfig> service_config;
+  // If resolver did not return a service config or returned an invalid service
+  // config, we need a fallback service config.
+  if (result.service_config_error != GRPC_ERROR_NONE) {
+    // If the service config was invalid, then fallback to the saved service
+    // config. If there is no saved config either, use the default service
+    // config.
+    if (chand->saved_service_config_ != nullptr) {
+      service_config = chand->saved_service_config_;
+      if (GRPC_TRACE_FLAG_ENABLED(grpc_client_channel_routing_trace)) {
+        gpr_log(GPR_INFO,
+                "chand=%p: resolver returned invalid service config. "
+                "Continuing to use previous service config.",
+                chand);
+      }
+    } else if (chand->default_service_config_ != nullptr) {
+      if (GRPC_TRACE_FLAG_ENABLED(grpc_client_channel_routing_trace)) {
+        gpr_log(GPR_INFO,
+                "chand=%p: resolver returned invalid service config. Using "
+                "default service config provided by client API.",
+                chand);
+      }
+      service_config = chand->default_service_config_;
+    }
+  } else if (result.service_config == nullptr) {
+    if (chand->default_service_config_ != nullptr) {
+      if (GRPC_TRACE_FLAG_ENABLED(grpc_client_channel_routing_trace)) {
+        gpr_log(GPR_INFO,
+                "chand=%p: resolver returned no service config. Using default "
+                "service config provided by client API.",
+                chand);
+      }
+      service_config = chand->default_service_config_;
+    }
+  } else {
+    service_config = result.service_config;
+  }
+  *service_config_error = GRPC_ERROR_REF(result.service_config_error);
+  if (service_config == nullptr &&
+      result.service_config_error != GRPC_ERROR_NONE) {
+    return false;
+  }
+  // Process service config.
+  UniquePtr<char> service_config_json;
+  const internal::ClientChannelGlobalParsedConfig* parsed_service_config =
+      nullptr;
+  if (service_config != nullptr) {
+    parsed_service_config =
+        static_cast<const internal::ClientChannelGlobalParsedConfig*>(
+            service_config->GetGlobalParsedConfig(
+                internal::ClientChannelServiceConfigParser::ParserIndex()));
+  }
+  // Check if the config has changed.
+  const bool service_config_changed =
+      ((service_config == nullptr) !=
+       (chand->saved_service_config_ == nullptr)) ||
+      (service_config != nullptr &&
+       strcmp(service_config->service_config_json(),
+              chand->saved_service_config_->service_config_json()) != 0);
+  if (service_config_changed) {
+    service_config_json.reset(gpr_strdup(
+        service_config != nullptr ? service_config->service_config_json()
+                                  : ""));
+    if (GRPC_TRACE_FLAG_ENABLED(grpc_client_channel_routing_trace)) {
+      gpr_log(GPR_INFO,
+              "chand=%p: resolver returned updated service config: \"%s\"",
+              chand, service_config_json.get());
+    }
+    // Save health check service name.
+    if (service_config != nullptr) {
+      chand->health_check_service_name_.reset(
+          gpr_strdup(parsed_service_config->health_check_service_name()));
+    } else {
+      chand->health_check_service_name_.reset();
+    }
+    // Update health check service name used by existing subchannel wrappers.
+    for (const auto& p : chand->subchannel_wrappers_) {
+      p.first->UpdateHealthCheckServiceName(
+          UniquePtr<char>(gpr_strdup(chand->health_check_service_name_.get())));
+    }
+    // Save service config.
+    chand->saved_service_config_ = std::move(service_config);
+  }
+  // We want to set the service config at least once. This should not really be
+  // needed, but we are doing it as a defensive approach. This can be removed,
+  // if we feel it is unnecessary.
+  if (service_config_changed || !chand->received_first_resolver_result_) {
+    chand->received_first_resolver_result_ = true;
+    RefCountedPtr<ServerRetryThrottleData> retry_throttle_data;
+    if (parsed_service_config != nullptr) {
+      Optional<internal::ClientChannelGlobalParsedConfig::RetryThrottling>
+          retry_throttle_config = parsed_service_config->retry_throttling();
+      if (retry_throttle_config.has_value()) {
+        retry_throttle_data =
+            internal::ServerRetryThrottleMap::GetDataForServer(
+                chand->server_name_.get(),
+                retry_throttle_config.value().max_milli_tokens,
+                retry_throttle_config.value().milli_token_ratio);
+      }
+    }
+    chand->UpdateServiceConfigLocked(std::move(retry_throttle_data),
+                                     chand->saved_service_config_);
+  }
+  UniquePtr<char> processed_lb_policy_name;
+  chand->ProcessLbPolicy(result, parsed_service_config,
+                         &processed_lb_policy_name, lb_policy_config);
+  // Swap out the data used by GetChannelInfo().
+  {
+    MutexLock lock(&chand->info_mu_);
+    chand->info_lb_policy_name_ = std::move(processed_lb_policy_name);
+    if (service_config_json != nullptr) {
+      chand->info_service_config_json_ = std::move(service_config_json);
+    }
+  }
+  // Return results.
+  *lb_policy_name = chand->info_lb_policy_name_.get();
+  return service_config_changed;
+}
+
+grpc_error* ChannelData::DoPingLocked(grpc_transport_op* op) {
+  if (grpc_connectivity_state_check(&state_tracker_) != GRPC_CHANNEL_READY) {
+    return GRPC_ERROR_CREATE_FROM_STATIC_STRING("channel not connected");
+  }
+  LoadBalancingPolicy::PickResult result =
+      picker_->Pick(LoadBalancingPolicy::PickArgs());
+  ConnectedSubchannel* connected_subchannel = nullptr;
+  if (result.subchannel != nullptr) {
+    SubchannelWrapper* subchannel =
+        static_cast<SubchannelWrapper*>(result.subchannel.get());
+    connected_subchannel = subchannel->connected_subchannel();
+  }
+  if (connected_subchannel != nullptr) {
+    connected_subchannel->Ping(op->send_ping.on_initiate, op->send_ping.on_ack);
+  } else {
+    if (result.error == GRPC_ERROR_NONE) {
+      result.error = GRPC_ERROR_CREATE_FROM_STATIC_STRING(
+          "LB policy dropped call on ping");
+    }
+  }
+  return result.error;
+}
+
+void ChannelData::StartTransportOpLocked(void* arg, grpc_error* ignored) {
+  grpc_transport_op* op = static_cast<grpc_transport_op*>(arg);
+  grpc_channel_element* elem =
+      static_cast<grpc_channel_element*>(op->handler_private.extra_arg);
+  ChannelData* chand = static_cast<ChannelData*>(elem->channel_data);
+  // Connectivity watch.
+  if (op->on_connectivity_state_change != nullptr) {
+    grpc_connectivity_state_notify_on_state_change(
+        &chand->state_tracker_, op->connectivity_state,
+        op->on_connectivity_state_change);
+    op->on_connectivity_state_change = nullptr;
+    op->connectivity_state = nullptr;
+  }
+  // Ping.
+  if (op->send_ping.on_initiate != nullptr || op->send_ping.on_ack != nullptr) {
+    grpc_error* error = chand->DoPingLocked(op);
+    if (error != GRPC_ERROR_NONE) {
+      GRPC_CLOSURE_SCHED(op->send_ping.on_initiate, GRPC_ERROR_REF(error));
+      GRPC_CLOSURE_SCHED(op->send_ping.on_ack, error);
+    }
+    op->bind_pollset = nullptr;
+    op->send_ping.on_initiate = nullptr;
+    op->send_ping.on_ack = nullptr;
+  }
+  // Reset backoff.
+  if (op->reset_connect_backoff) {
+    if (chand->resolving_lb_policy_ != nullptr) {
+      chand->resolving_lb_policy_->ResetBackoffLocked();
+    }
+  }
+  // Disconnect or enter IDLE.
+  if (op->disconnect_with_error != GRPC_ERROR_NONE) {
+    if (GRPC_TRACE_FLAG_ENABLED(grpc_client_channel_call_trace)) {
+      gpr_log(GPR_INFO, "chand=%p: disconnect_with_error: %s", chand,
+              grpc_error_string(op->disconnect_with_error));
+    }
+    chand->DestroyResolvingLoadBalancingPolicyLocked();
+    intptr_t value;
+    if (grpc_error_get_int(op->disconnect_with_error,
+                           GRPC_ERROR_INT_CHANNEL_CONNECTIVITY_STATE, &value) &&
+        static_cast<grpc_connectivity_state>(value) == GRPC_CHANNEL_IDLE) {
+      if (chand->disconnect_error() == GRPC_ERROR_NONE) {
+        // Enter IDLE state.
+        chand->UpdateStateAndPickerLocked(GRPC_CHANNEL_IDLE,
+                                          "channel entering IDLE", nullptr);
+      }
+      GRPC_ERROR_UNREF(op->disconnect_with_error);
+    } else {
+      // Disconnect.
+      GPR_ASSERT(chand->disconnect_error_.Load(MemoryOrder::RELAXED) ==
+                 GRPC_ERROR_NONE);
+      chand->disconnect_error_.Store(op->disconnect_with_error,
+                                     MemoryOrder::RELEASE);
+      chand->UpdateStateAndPickerLocked(
+          GRPC_CHANNEL_SHUTDOWN, "shutdown from API",
+          UniquePtr<LoadBalancingPolicy::SubchannelPicker>(
+              New<LoadBalancingPolicy::TransientFailurePicker>(
+                  GRPC_ERROR_REF(op->disconnect_with_error))));
+    }
+  }
+  GRPC_CHANNEL_STACK_UNREF(chand->owning_stack_, "start_transport_op");
+  GRPC_CLOSURE_SCHED(op->on_consumed, GRPC_ERROR_NONE);
+}
+
+void ChannelData::StartTransportOp(grpc_channel_element* elem,
+                                   grpc_transport_op* op) {
+  ChannelData* chand = static_cast<ChannelData*>(elem->channel_data);
+  GPR_ASSERT(op->set_accept_stream == false);
+  // Handle bind_pollset.
+  if (op->bind_pollset != nullptr) {
+    grpc_pollset_set_add_pollset(chand->interested_parties_, op->bind_pollset);
+  }
+  // Pop into control plane combiner for remaining ops.
+  op->handler_private.extra_arg = elem;
+  GRPC_CHANNEL_STACK_REF(chand->owning_stack_, "start_transport_op");
+  GRPC_CLOSURE_SCHED(
+      GRPC_CLOSURE_INIT(&op->handler_private.closure,
+                        ChannelData::StartTransportOpLocked, op,
+                        grpc_combiner_scheduler(chand->combiner_)),
+      GRPC_ERROR_NONE);
+}
+
+void ChannelData::GetChannelInfo(grpc_channel_element* elem,
+                                 const grpc_channel_info* info) {
+  ChannelData* chand = static_cast<ChannelData*>(elem->channel_data);
+  MutexLock lock(&chand->info_mu_);
+  if (info->lb_policy_name != nullptr) {
+    *info->lb_policy_name = gpr_strdup(chand->info_lb_policy_name_.get());
+  }
+  if (info->service_config_json != nullptr) {
+    *info->service_config_json =
+        gpr_strdup(chand->info_service_config_json_.get());
+  }
+}
+
+void ChannelData::AddQueuedPick(QueuedPick* pick,
+                                grpc_polling_entity* pollent) {
+  // Add call to queued picks list.
+  pick->next = queued_picks_;
+  queued_picks_ = pick;
+  // Add call's pollent to channel's interested_parties, so that I/O
+  // can be done under the call's CQ.
+  grpc_polling_entity_add_to_pollset_set(pollent, interested_parties_);
+}
+
+void ChannelData::RemoveQueuedPick(QueuedPick* to_remove,
+                                   grpc_polling_entity* pollent) {
+  // Remove call's pollent from channel's interested_parties.
+  grpc_polling_entity_del_from_pollset_set(pollent, interested_parties_);
+  // Remove from queued picks list.
+  for (QueuedPick** pick = &queued_picks_; *pick != nullptr;
+       pick = &(*pick)->next) {
+    if (*pick == to_remove) {
+      *pick = to_remove->next;
+      return;
+    }
+  }
+}
+
+RefCountedPtr<ConnectedSubchannel>
+ChannelData::GetConnectedSubchannelInDataPlane(
+    SubchannelInterface* subchannel) const {
+  SubchannelWrapper* subchannel_wrapper =
+      static_cast<SubchannelWrapper*>(subchannel);
+  ConnectedSubchannel* connected_subchannel =
+      subchannel_wrapper->connected_subchannel_in_data_plane();
+  if (connected_subchannel == nullptr) return nullptr;
+  return connected_subchannel->Ref();
+}
+
+void ChannelData::TryToConnectLocked(void* arg, grpc_error* error_ignored) {
+  auto* chand = static_cast<ChannelData*>(arg);
+  if (chand->resolving_lb_policy_ != nullptr) {
+    chand->resolving_lb_policy_->ExitIdleLocked();
+  } else {
+    chand->CreateResolvingLoadBalancingPolicyLocked();
+  }
+  GRPC_CHANNEL_STACK_UNREF(chand->owning_stack_, "TryToConnect");
+}
+
+grpc_connectivity_state ChannelData::CheckConnectivityState(
+    bool try_to_connect) {
+  grpc_connectivity_state out = grpc_connectivity_state_check(&state_tracker_);
+  if (out == GRPC_CHANNEL_IDLE && try_to_connect) {
+    GRPC_CHANNEL_STACK_REF(owning_stack_, "TryToConnect");
+    GRPC_CLOSURE_SCHED(GRPC_CLOSURE_CREATE(TryToConnectLocked, this,
+                                           grpc_combiner_scheduler(combiner_)),
+                       GRPC_ERROR_NONE);
+  }
+  return out;
+}
+
+//
+// CallData implementation
+//
+
+// Retry support:
+//
+// In order to support retries, we act as a proxy for stream op batches.
+// When we get a batch from the surface, we add it to our list of pending
+// batches, and we then use those batches to construct separate "child"
+// batches to be started on the subchannel call.  When the child batches
+// return, we then decide which pending batches have been completed and
+// schedule their callbacks accordingly.  If a subchannel call fails and
+// we want to retry it, we do a new pick and start again, constructing
+// new "child" batches for the new subchannel call.
+//
+// Note that retries are committed when receiving data from the server
+// (except for Trailers-Only responses).  However, there may be many
+// send ops started before receiving any data, so we may have already
+// completed some number of send ops (and returned the completions up to
+// the surface) by the time we realize that we need to retry.  To deal
+// with this, we cache data for send ops, so that we can replay them on a
+// different subchannel call even after we have completed the original
+// batches.
+//
+// There are two sets of data to maintain:
+// - In call_data (in the parent channel), we maintain a list of pending
+//   ops and cached data for send ops.
+// - In the subchannel call, we maintain state to indicate what ops have
+//   already been sent down to that call.
+//
+// When constructing the "child" batches, we compare those two sets of
+// data to see which batches need to be sent to the subchannel call.
+
+// TODO(roth): In subsequent PRs:
+// - add support for transparent retries (including initial metadata)
+// - figure out how to record stats in census for retries
+//   (census filter is on top of this one)
+// - add census stats for retries
+
+CallData::CallData(grpc_call_element* elem, const ChannelData& chand,
+                   const grpc_call_element_args& args)
+    : deadline_state_(elem, args.call_stack, args.call_combiner,
+                      GPR_LIKELY(chand.deadline_checking_enabled())
+                          ? args.deadline
+                          : GRPC_MILLIS_INF_FUTURE),
+      path_(grpc_slice_ref_internal(args.path)),
+      call_start_time_(args.start_time),
+      deadline_(args.deadline),
+      arena_(args.arena),
+      owning_call_(args.call_stack),
+      call_combiner_(args.call_combiner),
+      call_context_(args.context),
+      lb_call_state_(this),
+      pending_send_initial_metadata_(false),
+      pending_send_message_(false),
+      pending_send_trailing_metadata_(false),
+      enable_retries_(chand.enable_retries()),
+      retry_committed_(false),
+      last_attempt_got_server_pushback_(false) {}
+
+CallData::~CallData() {
+  grpc_slice_unref_internal(path_);
+  GRPC_ERROR_UNREF(cancel_error_);
+  if (backend_metric_data_ != nullptr) {
+    backend_metric_data_
+        ->LoadBalancingPolicy::BackendMetricData::~BackendMetricData();
+  }
+  // Make sure there are no remaining pending batches.
+  for (size_t i = 0; i < GPR_ARRAY_SIZE(pending_batches_); ++i) {
+    GPR_ASSERT(pending_batches_[i].batch == nullptr);
+  }
+}
+
+grpc_error* CallData::Init(grpc_call_element* elem,
+                           const grpc_call_element_args* args) {
+  ChannelData* chand = static_cast<ChannelData*>(elem->channel_data);
+  new (elem->call_data) CallData(elem, *chand, *args);
+  return GRPC_ERROR_NONE;
+}
+
+void CallData::Destroy(grpc_call_element* elem,
+                       const grpc_call_final_info* final_info,
+                       grpc_closure* then_schedule_closure) {
+  CallData* calld = static_cast<CallData*>(elem->call_data);
+  if (GPR_LIKELY(calld->subchannel_call_ != nullptr)) {
+    calld->subchannel_call_->SetAfterCallStackDestroy(then_schedule_closure);
+    then_schedule_closure = nullptr;
+  }
+  calld->~CallData();
+  GRPC_CLOSURE_SCHED(then_schedule_closure, GRPC_ERROR_NONE);
+}
+
+void CallData::StartTransportStreamOpBatch(
+    grpc_call_element* elem, grpc_transport_stream_op_batch* batch) {
+  GPR_TIMER_SCOPE("cc_start_transport_stream_op_batch", 0);
+  CallData* calld = static_cast<CallData*>(elem->call_data);
+  ChannelData* chand = static_cast<ChannelData*>(elem->channel_data);
+  if (GPR_LIKELY(chand->deadline_checking_enabled())) {
+    grpc_deadline_state_client_start_transport_stream_op_batch(elem, batch);
+  }
+  // If we've previously been cancelled, immediately fail any new batches.
+  if (GPR_UNLIKELY(calld->cancel_error_ != GRPC_ERROR_NONE)) {
+    if (GRPC_TRACE_FLAG_ENABLED(grpc_client_channel_call_trace)) {
+      gpr_log(GPR_INFO, "chand=%p calld=%p: failing batch with error: %s",
+              chand, calld, grpc_error_string(calld->cancel_error_));
+    }
+    // Note: This will release the call combiner.
+    grpc_transport_stream_op_batch_finish_with_failure(
+        batch, GRPC_ERROR_REF(calld->cancel_error_), calld->call_combiner_);
+    return;
+  }
+  // Handle cancellation.
+  if (GPR_UNLIKELY(batch->cancel_stream)) {
+    // Stash a copy of cancel_error in our call data, so that we can use
+    // it for subsequent operations.  This ensures that if the call is
+    // cancelled before any batches are passed down (e.g., if the deadline
+    // is in the past when the call starts), we can return the right
+    // error to the caller when the first batch does get passed down.
+    GRPC_ERROR_UNREF(calld->cancel_error_);
+    calld->cancel_error_ =
+        GRPC_ERROR_REF(batch->payload->cancel_stream.cancel_error);
+    if (GRPC_TRACE_FLAG_ENABLED(grpc_client_channel_call_trace)) {
+      gpr_log(GPR_INFO, "chand=%p calld=%p: recording cancel_error=%s", chand,
+              calld, grpc_error_string(calld->cancel_error_));
+    }
+    // If we do not have a subchannel call (i.e., a pick has not yet
+    // been started), fail all pending batches.  Otherwise, send the
+    // cancellation down to the subchannel call.
+    if (calld->subchannel_call_ == nullptr) {
+      // TODO(roth): If there is a pending retry callback, do we need to
+      // cancel it here?
+      calld->PendingBatchesFail(elem, GRPC_ERROR_REF(calld->cancel_error_),
+                                NoYieldCallCombiner);
+      // Note: This will release the call combiner.
+      grpc_transport_stream_op_batch_finish_with_failure(
+          batch, GRPC_ERROR_REF(calld->cancel_error_), calld->call_combiner_);
+    } else {
+      // Note: This will release the call combiner.
+      calld->subchannel_call_->StartTransportStreamOpBatch(batch);
+    }
+    return;
+  }
+  // Add the batch to the pending list.
+  calld->PendingBatchesAdd(elem, batch);
+  // Check if we've already gotten a subchannel call.
+  // Note that once we have picked a subchannel, we do not need to acquire
+  // the channel's data plane mutex, which is more efficient (especially for
+  // streaming calls).
+  if (calld->subchannel_call_ != nullptr) {
+    if (GRPC_TRACE_FLAG_ENABLED(grpc_client_channel_call_trace)) {
+      gpr_log(GPR_INFO,
+              "chand=%p calld=%p: starting batch on subchannel_call=%p", chand,
+              calld, calld->subchannel_call_.get());
+    }
+    calld->PendingBatchesResume(elem);
+    return;
+  }
+  // We do not yet have a subchannel call.
+  // For batches containing a send_initial_metadata op, acquire the
+  // channel's data plane mutex to pick a subchannel.
+  if (GPR_LIKELY(batch->send_initial_metadata)) {
+    if (GRPC_TRACE_FLAG_ENABLED(grpc_client_channel_call_trace)) {
+      gpr_log(GPR_INFO,
+              "chand=%p calld=%p: grabbing data plane mutex to perform pick",
+              chand, calld);
+    }
+    PickSubchannel(elem, GRPC_ERROR_NONE);
+  } else {
+    // For all other batches, release the call combiner.
+    if (GRPC_TRACE_FLAG_ENABLED(grpc_client_channel_call_trace)) {
+      gpr_log(GPR_INFO,
+              "chand=%p calld=%p: saved batch, yielding call combiner", chand,
+              calld);
+    }
+    GRPC_CALL_COMBINER_STOP(calld->call_combiner_,
+                            "batch does not include send_initial_metadata");
+  }
+}
+
+void CallData::SetPollent(grpc_call_element* elem,
+                          grpc_polling_entity* pollent) {
+  CallData* calld = static_cast<CallData*>(elem->call_data);
+  calld->pollent_ = pollent;
+}
+
+//
+// send op data caching
+//
+
+void CallData::MaybeCacheSendOpsForBatch(PendingBatch* pending) {
+  if (pending->send_ops_cached) return;
+  pending->send_ops_cached = true;
+  grpc_transport_stream_op_batch* batch = pending->batch;
+  // Save a copy of metadata for send_initial_metadata ops.
+  if (batch->send_initial_metadata) {
+    seen_send_initial_metadata_ = true;
+    GPR_ASSERT(send_initial_metadata_storage_ == nullptr);
+    grpc_metadata_batch* send_initial_metadata =
+        batch->payload->send_initial_metadata.send_initial_metadata;
+    send_initial_metadata_storage_ = (grpc_linked_mdelem*)arena_->Alloc(
+        sizeof(grpc_linked_mdelem) * send_initial_metadata->list.count);
+    grpc_metadata_batch_copy(send_initial_metadata, &send_initial_metadata_,
+                             send_initial_metadata_storage_);
+    send_initial_metadata_flags_ =
+        batch->payload->send_initial_metadata.send_initial_metadata_flags;
+    peer_string_ = batch->payload->send_initial_metadata.peer_string;
+  }
+  // Set up cache for send_message ops.
+  if (batch->send_message) {
+    ByteStreamCache* cache = arena_->New<ByteStreamCache>(
+        std::move(batch->payload->send_message.send_message));
+    send_messages_.push_back(cache);
+  }
+  // Save metadata batch for send_trailing_metadata ops.
+  if (batch->send_trailing_metadata) {
+    seen_send_trailing_metadata_ = true;
+    GPR_ASSERT(send_trailing_metadata_storage_ == nullptr);
+    grpc_metadata_batch* send_trailing_metadata =
+        batch->payload->send_trailing_metadata.send_trailing_metadata;
+    send_trailing_metadata_storage_ = (grpc_linked_mdelem*)arena_->Alloc(
+        sizeof(grpc_linked_mdelem) * send_trailing_metadata->list.count);
+    grpc_metadata_batch_copy(send_trailing_metadata, &send_trailing_metadata_,
+                             send_trailing_metadata_storage_);
+  }
+}
+
+void CallData::FreeCachedSendInitialMetadata(ChannelData* chand) {
+  if (GRPC_TRACE_FLAG_ENABLED(grpc_client_channel_call_trace)) {
+    gpr_log(GPR_INFO,
+            "chand=%p calld=%p: destroying calld->send_initial_metadata", chand,
+            this);
+  }
+  grpc_metadata_batch_destroy(&send_initial_metadata_);
+}
+
+void CallData::FreeCachedSendMessage(ChannelData* chand, size_t idx) {
+  if (GRPC_TRACE_FLAG_ENABLED(grpc_client_channel_call_trace)) {
+    gpr_log(GPR_INFO,
+            "chand=%p calld=%p: destroying calld->send_messages[%" PRIuPTR "]",
+            chand, this, idx);
+  }
+  send_messages_[idx]->Destroy();
+}
+
+void CallData::FreeCachedSendTrailingMetadata(ChannelData* chand) {
+  if (GRPC_TRACE_FLAG_ENABLED(grpc_client_channel_call_trace)) {
+    gpr_log(GPR_INFO,
+            "chand=%p calld=%p: destroying calld->send_trailing_metadata",
+            chand, this);
+  }
+  grpc_metadata_batch_destroy(&send_trailing_metadata_);
+}
+
+void CallData::FreeCachedSendOpDataAfterCommit(
+    grpc_call_element* elem, SubchannelCallRetryState* retry_state) {
+  ChannelData* chand = static_cast<ChannelData*>(elem->channel_data);
+  if (retry_state->completed_send_initial_metadata) {
+    FreeCachedSendInitialMetadata(chand);
+  }
+  for (size_t i = 0; i < retry_state->completed_send_message_count; ++i) {
+    FreeCachedSendMessage(chand, i);
+  }
+  if (retry_state->completed_send_trailing_metadata) {
+    FreeCachedSendTrailingMetadata(chand);
+  }
+}
+
+void CallData::FreeCachedSendOpDataForCompletedBatch(
+    grpc_call_element* elem, SubchannelCallBatchData* batch_data,
+    SubchannelCallRetryState* retry_state) {
+  ChannelData* chand = static_cast<ChannelData*>(elem->channel_data);
+  if (batch_data->batch.send_initial_metadata) {
+    FreeCachedSendInitialMetadata(chand);
+  }
+  if (batch_data->batch.send_message) {
+    FreeCachedSendMessage(chand, retry_state->completed_send_message_count - 1);
+  }
+  if (batch_data->batch.send_trailing_metadata) {
+    FreeCachedSendTrailingMetadata(chand);
+  }
+}
+
+//
+// LB recv_trailing_metadata_ready handling
+//
+
+void CallData::RecvTrailingMetadataReadyForLoadBalancingPolicy(
+    void* arg, grpc_error* error) {
+  CallData* calld = static_cast<CallData*>(arg);
+  // Invoke callback to LB policy.
+  Metadata trailing_metadata(calld, calld->recv_trailing_metadata_);
+  calld->lb_recv_trailing_metadata_ready_(
+      calld->lb_recv_trailing_metadata_ready_user_data_, error,
+      &trailing_metadata, &calld->lb_call_state_);
+  // Chain to original callback.
+  GRPC_CLOSURE_RUN(calld->original_recv_trailing_metadata_ready_,
+                   GRPC_ERROR_REF(error));
+}
+
+void CallData::MaybeInjectRecvTrailingMetadataReadyForLoadBalancingPolicy(
+    grpc_transport_stream_op_batch* batch) {
+  if (lb_recv_trailing_metadata_ready_ != nullptr) {
+    recv_trailing_metadata_ =
+        batch->payload->recv_trailing_metadata.recv_trailing_metadata;
+    original_recv_trailing_metadata_ready_ =
+        batch->payload->recv_trailing_metadata.recv_trailing_metadata_ready;
+    GRPC_CLOSURE_INIT(&recv_trailing_metadata_ready_,
+                      RecvTrailingMetadataReadyForLoadBalancingPolicy, this,
+                      grpc_schedule_on_exec_ctx);
+    batch->payload->recv_trailing_metadata.recv_trailing_metadata_ready =
+        &recv_trailing_metadata_ready_;
+  }
+}
+
+//
+// pending_batches management
+//
+
+size_t CallData::GetBatchIndex(grpc_transport_stream_op_batch* batch) {
+  // Note: It is important the send_initial_metadata be the first entry
+  // here, since the code in pick_subchannel_locked() assumes it will be.
+  if (batch->send_initial_metadata) return 0;
+  if (batch->send_message) return 1;
+  if (batch->send_trailing_metadata) return 2;
+  if (batch->recv_initial_metadata) return 3;
+  if (batch->recv_message) return 4;
+  if (batch->recv_trailing_metadata) return 5;
+  GPR_UNREACHABLE_CODE(return (size_t)-1);
+}
+
+// This is called via the call combiner, so access to calld is synchronized.
+void CallData::PendingBatchesAdd(grpc_call_element* elem,
+                                 grpc_transport_stream_op_batch* batch) {
+  ChannelData* chand = static_cast<ChannelData*>(elem->channel_data);
+  const size_t idx = GetBatchIndex(batch);
+  if (GRPC_TRACE_FLAG_ENABLED(grpc_client_channel_call_trace)) {
+    gpr_log(GPR_INFO,
+            "chand=%p calld=%p: adding pending batch at index %" PRIuPTR, chand,
+            this, idx);
+  }
+  PendingBatch* pending = &pending_batches_[idx];
+  GPR_ASSERT(pending->batch == nullptr);
+  pending->batch = batch;
+  pending->send_ops_cached = false;
+  if (enable_retries_) {
+    // Update state in calld about pending batches.
+    // Also check if the batch takes us over the retry buffer limit.
+    // Note: We don't check the size of trailing metadata here, because
+    // gRPC clients do not send trailing metadata.
+    if (batch->send_initial_metadata) {
+      pending_send_initial_metadata_ = true;
+      bytes_buffered_for_retry_ += grpc_metadata_batch_size(
+          batch->payload->send_initial_metadata.send_initial_metadata);
+    }
+    if (batch->send_message) {
+      pending_send_message_ = true;
+      bytes_buffered_for_retry_ +=
+          batch->payload->send_message.send_message->length();
+    }
+    if (batch->send_trailing_metadata) {
+      pending_send_trailing_metadata_ = true;
+    }
+    if (GPR_UNLIKELY(bytes_buffered_for_retry_ >
+                     chand->per_rpc_retry_buffer_size())) {
+      if (GRPC_TRACE_FLAG_ENABLED(grpc_client_channel_call_trace)) {
+        gpr_log(GPR_INFO,
+                "chand=%p calld=%p: exceeded retry buffer size, committing",
+                chand, this);
+      }
+      SubchannelCallRetryState* retry_state =
+          subchannel_call_ == nullptr ? nullptr
+                                      : static_cast<SubchannelCallRetryState*>(
+                                            subchannel_call_->GetParentData());
+      RetryCommit(elem, retry_state);
+      // If we are not going to retry and have not yet started, pretend
+      // retries are disabled so that we don't bother with retry overhead.
+      if (num_attempts_completed_ == 0) {
+        if (GRPC_TRACE_FLAG_ENABLED(grpc_client_channel_call_trace)) {
+          gpr_log(GPR_INFO,
+                  "chand=%p calld=%p: disabling retries before first attempt",
+                  chand, this);
+        }
+        enable_retries_ = false;
+      }
+    }
+  }
+}
+
+void CallData::PendingBatchClear(PendingBatch* pending) {
+  if (enable_retries_) {
+    if (pending->batch->send_initial_metadata) {
+      pending_send_initial_metadata_ = false;
+    }
+    if (pending->batch->send_message) {
+      pending_send_message_ = false;
+    }
+    if (pending->batch->send_trailing_metadata) {
+      pending_send_trailing_metadata_ = false;
+    }
+  }
+  pending->batch = nullptr;
+}
+
+void CallData::MaybeClearPendingBatch(grpc_call_element* elem,
+                                      PendingBatch* pending) {
+  ChannelData* chand = static_cast<ChannelData*>(elem->channel_data);
+  grpc_transport_stream_op_batch* batch = pending->batch;
+  // We clear the pending batch if all of its callbacks have been
+  // scheduled and reset to nullptr.
+  if (batch->on_complete == nullptr &&
+      (!batch->recv_initial_metadata ||
+       batch->payload->recv_initial_metadata.recv_initial_metadata_ready ==
+           nullptr) &&
+      (!batch->recv_message ||
+       batch->payload->recv_message.recv_message_ready == nullptr) &&
+      (!batch->recv_trailing_metadata ||
+       batch->payload->recv_trailing_metadata.recv_trailing_metadata_ready ==
+           nullptr)) {
+    if (GRPC_TRACE_FLAG_ENABLED(grpc_client_channel_call_trace)) {
+      gpr_log(GPR_INFO, "chand=%p calld=%p: clearing pending batch", chand,
+              this);
+    }
+    PendingBatchClear(pending);
+  }
+}
+
+// This is called via the call combiner, so access to calld is synchronized.
+void CallData::FailPendingBatchInCallCombiner(void* arg, grpc_error* error) {
+  grpc_transport_stream_op_batch* batch =
+      static_cast<grpc_transport_stream_op_batch*>(arg);
+  CallData* calld = static_cast<CallData*>(batch->handler_private.extra_arg);
+  // Note: This will release the call combiner.
+  grpc_transport_stream_op_batch_finish_with_failure(
+      batch, GRPC_ERROR_REF(error), calld->call_combiner_);
+}
+
+// This is called via the call combiner, so access to calld is synchronized.
+void CallData::PendingBatchesFail(
+    grpc_call_element* elem, grpc_error* error,
+    YieldCallCombinerPredicate yield_call_combiner_predicate) {
+  GPR_ASSERT(error != GRPC_ERROR_NONE);
+  if (GRPC_TRACE_FLAG_ENABLED(grpc_client_channel_call_trace)) {
+    size_t num_batches = 0;
+    for (size_t i = 0; i < GPR_ARRAY_SIZE(pending_batches_); ++i) {
+      if (pending_batches_[i].batch != nullptr) ++num_batches;
+    }
+    gpr_log(GPR_INFO,
+            "chand=%p calld=%p: failing %" PRIuPTR " pending batches: %s",
+            elem->channel_data, this, num_batches, grpc_error_string(error));
+  }
+  CallCombinerClosureList closures;
+  for (size_t i = 0; i < GPR_ARRAY_SIZE(pending_batches_); ++i) {
+    PendingBatch* pending = &pending_batches_[i];
+    grpc_transport_stream_op_batch* batch = pending->batch;
+    if (batch != nullptr) {
+      if (batch->recv_trailing_metadata) {
+        MaybeInjectRecvTrailingMetadataReadyForLoadBalancingPolicy(batch);
+      }
+      batch->handler_private.extra_arg = this;
+      GRPC_CLOSURE_INIT(&batch->handler_private.closure,
+                        FailPendingBatchInCallCombiner, batch,
+                        grpc_schedule_on_exec_ctx);
+      closures.Add(&batch->handler_private.closure, GRPC_ERROR_REF(error),
+                   "PendingBatchesFail");
+      PendingBatchClear(pending);
+    }
+  }
+  if (yield_call_combiner_predicate(closures)) {
+    closures.RunClosures(call_combiner_);
+  } else {
+    closures.RunClosuresWithoutYielding(call_combiner_);
+  }
+  GRPC_ERROR_UNREF(error);
+}
+
+// This is called via the call combiner, so access to calld is synchronized.
+void CallData::ResumePendingBatchInCallCombiner(void* arg,
+                                                grpc_error* ignored) {
+  grpc_transport_stream_op_batch* batch =
+      static_cast<grpc_transport_stream_op_batch*>(arg);
+  SubchannelCall* subchannel_call =
+      static_cast<SubchannelCall*>(batch->handler_private.extra_arg);
+  // Note: This will release the call combiner.
+  subchannel_call->StartTransportStreamOpBatch(batch);
+}
+
+// This is called via the call combiner, so access to calld is synchronized.
+void CallData::PendingBatchesResume(grpc_call_element* elem) {
+  ChannelData* chand = static_cast<ChannelData*>(elem->channel_data);
+  if (enable_retries_) {
+    StartRetriableSubchannelBatches(elem, GRPC_ERROR_NONE);
+    return;
+  }
+  // Retries not enabled; send down batches as-is.
+  if (GRPC_TRACE_FLAG_ENABLED(grpc_client_channel_call_trace)) {
+    size_t num_batches = 0;
+    for (size_t i = 0; i < GPR_ARRAY_SIZE(pending_batches_); ++i) {
+      if (pending_batches_[i].batch != nullptr) ++num_batches;
+    }
+    gpr_log(GPR_INFO,
+            "chand=%p calld=%p: starting %" PRIuPTR
+            " pending batches on subchannel_call=%p",
+            chand, this, num_batches, subchannel_call_.get());
+  }
+  CallCombinerClosureList closures;
+  for (size_t i = 0; i < GPR_ARRAY_SIZE(pending_batches_); ++i) {
+    PendingBatch* pending = &pending_batches_[i];
+    grpc_transport_stream_op_batch* batch = pending->batch;
+    if (batch != nullptr) {
+      if (batch->recv_trailing_metadata) {
+        MaybeInjectRecvTrailingMetadataReadyForLoadBalancingPolicy(batch);
+      }
+      batch->handler_private.extra_arg = subchannel_call_.get();
+      GRPC_CLOSURE_INIT(&batch->handler_private.closure,
+                        ResumePendingBatchInCallCombiner, batch,
+                        grpc_schedule_on_exec_ctx);
+      closures.Add(&batch->handler_private.closure, GRPC_ERROR_NONE,
+                   "PendingBatchesResume");
+      PendingBatchClear(pending);
+    }
+  }
+  // Note: This will release the call combiner.
+  closures.RunClosures(call_combiner_);
+}
+
+template <typename Predicate>
+CallData::PendingBatch* CallData::PendingBatchFind(grpc_call_element* elem,
+                                                   const char* log_message,
+                                                   Predicate predicate) {
+  ChannelData* chand = static_cast<ChannelData*>(elem->channel_data);
+  for (size_t i = 0; i < GPR_ARRAY_SIZE(pending_batches_); ++i) {
+    PendingBatch* pending = &pending_batches_[i];
+    grpc_transport_stream_op_batch* batch = pending->batch;
+    if (batch != nullptr && predicate(batch)) {
+      if (GRPC_TRACE_FLAG_ENABLED(grpc_client_channel_call_trace)) {
+        gpr_log(GPR_INFO,
+                "chand=%p calld=%p: %s pending batch at index %" PRIuPTR, chand,
+                this, log_message, i);
+      }
+      return pending;
+    }
+  }
+  return nullptr;
+}
+
+//
+// retry code
+//
+
+void CallData::RetryCommit(grpc_call_element* elem,
+                           SubchannelCallRetryState* retry_state) {
+  ChannelData* chand = static_cast<ChannelData*>(elem->channel_data);
+  if (retry_committed_) return;
+  retry_committed_ = true;
+  if (GRPC_TRACE_FLAG_ENABLED(grpc_client_channel_call_trace)) {
+    gpr_log(GPR_INFO, "chand=%p calld=%p: committing retries", chand, this);
+  }
+  if (retry_state != nullptr) {
+    FreeCachedSendOpDataAfterCommit(elem, retry_state);
+  }
+}
+
+void CallData::DoRetry(grpc_call_element* elem,
+                       SubchannelCallRetryState* retry_state,
+                       grpc_millis server_pushback_ms) {
+  ChannelData* chand = static_cast<ChannelData*>(elem->channel_data);
+  GPR_ASSERT(method_params_ != nullptr);
+  const auto* retry_policy = method_params_->retry_policy();
+  GPR_ASSERT(retry_policy != nullptr);
+  // Reset subchannel call.
+  subchannel_call_.reset();
+  // Compute backoff delay.
+  grpc_millis next_attempt_time;
+  if (server_pushback_ms >= 0) {
+    next_attempt_time = ExecCtx::Get()->Now() + server_pushback_ms;
+    last_attempt_got_server_pushback_ = true;
+  } else {
+    if (num_attempts_completed_ == 1 || last_attempt_got_server_pushback_) {
+      retry_backoff_.Init(
+          BackOff::Options()
+              .set_initial_backoff(retry_policy->initial_backoff)
+              .set_multiplier(retry_policy->backoff_multiplier)
+              .set_jitter(RETRY_BACKOFF_JITTER)
+              .set_max_backoff(retry_policy->max_backoff));
+      last_attempt_got_server_pushback_ = false;
+    }
+    next_attempt_time = retry_backoff_->NextAttemptTime();
+  }
+  if (GRPC_TRACE_FLAG_ENABLED(grpc_client_channel_call_trace)) {
+    gpr_log(GPR_INFO,
+            "chand=%p calld=%p: retrying failed call in %" PRId64 " ms", chand,
+            this, next_attempt_time - ExecCtx::Get()->Now());
+  }
+  // Schedule retry after computed delay.
+  GRPC_CLOSURE_INIT(&pick_closure_, PickSubchannel, elem,
+                    grpc_schedule_on_exec_ctx);
+  grpc_timer_init(&retry_timer_, next_attempt_time, &pick_closure_);
+  // Update bookkeeping.
+  if (retry_state != nullptr) retry_state->retry_dispatched = true;
+}
+
+bool CallData::MaybeRetry(grpc_call_element* elem,
+                          SubchannelCallBatchData* batch_data,
+                          grpc_status_code status,
+                          grpc_mdelem* server_pushback_md) {
+  ChannelData* chand = static_cast<ChannelData*>(elem->channel_data);
+  // Get retry policy.
+  if (method_params_ == nullptr) return false;
+  const auto* retry_policy = method_params_->retry_policy();
+  if (retry_policy == nullptr) return false;
+  // If we've already dispatched a retry from this call, return true.
+  // This catches the case where the batch has multiple callbacks
+  // (i.e., it includes either recv_message or recv_initial_metadata).
+  SubchannelCallRetryState* retry_state = nullptr;
+  if (batch_data != nullptr) {
+    retry_state = static_cast<SubchannelCallRetryState*>(
+        batch_data->subchannel_call->GetParentData());
+    if (retry_state->retry_dispatched) {
+      if (GRPC_TRACE_FLAG_ENABLED(grpc_client_channel_call_trace)) {
+        gpr_log(GPR_INFO, "chand=%p calld=%p: retry already dispatched", chand,
+                this);
+      }
+      return true;
+    }
+  }
+  // Check status.
+  if (GPR_LIKELY(status == GRPC_STATUS_OK)) {
+    if (retry_throttle_data_ != nullptr) {
+      retry_throttle_data_->RecordSuccess();
+    }
+    if (GRPC_TRACE_FLAG_ENABLED(grpc_client_channel_call_trace)) {
+      gpr_log(GPR_INFO, "chand=%p calld=%p: call succeeded", chand, this);
+    }
+    return false;
+  }
+  // Status is not OK.  Check whether the status is retryable.
+  if (!retry_policy->retryable_status_codes.Contains(status)) {
+    if (GRPC_TRACE_FLAG_ENABLED(grpc_client_channel_call_trace)) {
+      gpr_log(GPR_INFO,
+              "chand=%p calld=%p: status %s not configured as retryable", chand,
+              this, grpc_status_code_to_string(status));
+    }
+    return false;
+  }
+  // Record the failure and check whether retries are throttled.
+  // Note that it's important for this check to come after the status
+  // code check above, since we should only record failures whose statuses
+  // match the configured retryable status codes, so that we don't count
+  // things like failures due to malformed requests (INVALID_ARGUMENT).
+  // Conversely, it's important for this to come before the remaining
+  // checks, so that we don't fail to record failures due to other factors.
+  if (retry_throttle_data_ != nullptr &&
+      !retry_throttle_data_->RecordFailure()) {
+    if (GRPC_TRACE_FLAG_ENABLED(grpc_client_channel_call_trace)) {
+      gpr_log(GPR_INFO, "chand=%p calld=%p: retries throttled", chand, this);
+    }
+    return false;
+  }
+  // Check whether the call is committed.
+  if (retry_committed_) {
+    if (GRPC_TRACE_FLAG_ENABLED(grpc_client_channel_call_trace)) {
+      gpr_log(GPR_INFO, "chand=%p calld=%p: retries already committed", chand,
+              this);
+    }
+    return false;
+  }
+  // Check whether we have retries remaining.
+  ++num_attempts_completed_;
+  if (num_attempts_completed_ >= retry_policy->max_attempts) {
+    if (GRPC_TRACE_FLAG_ENABLED(grpc_client_channel_call_trace)) {
+      gpr_log(GPR_INFO, "chand=%p calld=%p: exceeded %d retry attempts", chand,
+              this, retry_policy->max_attempts);
+    }
+    return false;
+  }
+  // If the call was cancelled from the surface, don't retry.
+  if (cancel_error_ != GRPC_ERROR_NONE) {
+    if (GRPC_TRACE_FLAG_ENABLED(grpc_client_channel_call_trace)) {
+      gpr_log(GPR_INFO,
+              "chand=%p calld=%p: call cancelled from surface, not retrying",
+              chand, this);
+    }
+    return false;
+  }
+  // Check server push-back.
+  grpc_millis server_pushback_ms = -1;
+  if (server_pushback_md != nullptr) {
+    // If the value is "-1" or any other unparseable string, we do not retry.
+    uint32_t ms;
+    if (!grpc_parse_slice_to_uint32(GRPC_MDVALUE(*server_pushback_md), &ms)) {
+      if (GRPC_TRACE_FLAG_ENABLED(grpc_client_channel_call_trace)) {
+        gpr_log(GPR_INFO,
+                "chand=%p calld=%p: not retrying due to server push-back",
+                chand, this);
+      }
+      return false;
+    } else {
+      if (GRPC_TRACE_FLAG_ENABLED(grpc_client_channel_call_trace)) {
+        gpr_log(GPR_INFO, "chand=%p calld=%p: server push-back: retry in %u ms",
+                chand, this, ms);
+      }
+      server_pushback_ms = (grpc_millis)ms;
+    }
+  }
+  DoRetry(elem, retry_state, server_pushback_ms);
+  return true;
+}
+
+//
+// CallData::SubchannelCallBatchData
+//
+
+CallData::SubchannelCallBatchData* CallData::SubchannelCallBatchData::Create(
+    grpc_call_element* elem, int refcount, bool set_on_complete) {
+  CallData* calld = static_cast<CallData*>(elem->call_data);
+  return calld->arena_->New<SubchannelCallBatchData>(elem, calld, refcount,
+                                                     set_on_complete);
+}
+
+CallData::SubchannelCallBatchData::SubchannelCallBatchData(
+    grpc_call_element* elem, CallData* calld, int refcount,
+    bool set_on_complete)
+    : elem(elem), subchannel_call(calld->subchannel_call_) {
+  SubchannelCallRetryState* retry_state =
+      static_cast<SubchannelCallRetryState*>(
+          calld->subchannel_call_->GetParentData());
+  batch.payload = &retry_state->batch_payload;
+  gpr_ref_init(&refs, refcount);
+  if (set_on_complete) {
+    GRPC_CLOSURE_INIT(&on_complete, CallData::OnComplete, this,
+                      grpc_schedule_on_exec_ctx);
+    batch.on_complete = &on_complete;
+  }
+  GRPC_CALL_STACK_REF(calld->owning_call_, "batch_data");
+}
+
+void CallData::SubchannelCallBatchData::Destroy() {
+  SubchannelCallRetryState* retry_state =
+      static_cast<SubchannelCallRetryState*>(subchannel_call->GetParentData());
+  if (batch.send_initial_metadata) {
+    grpc_metadata_batch_destroy(&retry_state->send_initial_metadata);
+  }
+  if (batch.send_trailing_metadata) {
+    grpc_metadata_batch_destroy(&retry_state->send_trailing_metadata);
+  }
+  if (batch.recv_initial_metadata) {
+    grpc_metadata_batch_destroy(&retry_state->recv_initial_metadata);
+  }
+  if (batch.recv_trailing_metadata) {
+    grpc_metadata_batch_destroy(&retry_state->recv_trailing_metadata);
+  }
+  subchannel_call.reset();
+  CallData* calld = static_cast<CallData*>(elem->call_data);
+  GRPC_CALL_STACK_UNREF(calld->owning_call_, "batch_data");
+}
+
+//
+// recv_initial_metadata callback handling
+//
+
+void CallData::InvokeRecvInitialMetadataCallback(void* arg, grpc_error* error) {
+  SubchannelCallBatchData* batch_data =
+      static_cast<SubchannelCallBatchData*>(arg);
+  CallData* calld = static_cast<CallData*>(batch_data->elem->call_data);
+  // Find pending batch.
+  PendingBatch* pending = calld->PendingBatchFind(
+      batch_data->elem, "invoking recv_initial_metadata_ready for",
+      [](grpc_transport_stream_op_batch* batch) {
+        return batch->recv_initial_metadata &&
+               batch->payload->recv_initial_metadata
+                       .recv_initial_metadata_ready != nullptr;
+      });
+  GPR_ASSERT(pending != nullptr);
+  // Return metadata.
+  SubchannelCallRetryState* retry_state =
+      static_cast<SubchannelCallRetryState*>(
+          batch_data->subchannel_call->GetParentData());
+  grpc_metadata_batch_move(
+      &retry_state->recv_initial_metadata,
+      pending->batch->payload->recv_initial_metadata.recv_initial_metadata);
+  // Update bookkeeping.
+  // Note: Need to do this before invoking the callback, since invoking
+  // the callback will result in yielding the call combiner.
+  grpc_closure* recv_initial_metadata_ready =
+      pending->batch->payload->recv_initial_metadata
+          .recv_initial_metadata_ready;
+  pending->batch->payload->recv_initial_metadata.recv_initial_metadata_ready =
+      nullptr;
+  calld->MaybeClearPendingBatch(batch_data->elem, pending);
+  batch_data->Unref();
+  // Invoke callback.
+  GRPC_CLOSURE_RUN(recv_initial_metadata_ready, GRPC_ERROR_REF(error));
+}
+
+void CallData::RecvInitialMetadataReady(void* arg, grpc_error* error) {
+  SubchannelCallBatchData* batch_data =
+      static_cast<SubchannelCallBatchData*>(arg);
+  grpc_call_element* elem = batch_data->elem;
+  ChannelData* chand = static_cast<ChannelData*>(elem->channel_data);
+  CallData* calld = static_cast<CallData*>(elem->call_data);
+  if (GRPC_TRACE_FLAG_ENABLED(grpc_client_channel_call_trace)) {
+    gpr_log(GPR_INFO,
+            "chand=%p calld=%p: got recv_initial_metadata_ready, error=%s",
+            chand, calld, grpc_error_string(error));
+  }
+  SubchannelCallRetryState* retry_state =
+      static_cast<SubchannelCallRetryState*>(
+          batch_data->subchannel_call->GetParentData());
+  retry_state->completed_recv_initial_metadata = true;
+  // If a retry was already dispatched, then we're not going to use the
+  // result of this recv_initial_metadata op, so do nothing.
+  if (retry_state->retry_dispatched) {
+    GRPC_CALL_COMBINER_STOP(
+        calld->call_combiner_,
+        "recv_initial_metadata_ready after retry dispatched");
+    return;
+  }
+  // If we got an error or a Trailers-Only response and have not yet gotten
+  // the recv_trailing_metadata_ready callback, then defer propagating this
+  // callback back to the surface.  We can evaluate whether to retry when
+  // recv_trailing_metadata comes back.
+  if (GPR_UNLIKELY((retry_state->trailing_metadata_available ||
+                    error != GRPC_ERROR_NONE) &&
+                   !retry_state->completed_recv_trailing_metadata)) {
+    if (GRPC_TRACE_FLAG_ENABLED(grpc_client_channel_call_trace)) {
+      gpr_log(GPR_INFO,
+              "chand=%p calld=%p: deferring recv_initial_metadata_ready "
+              "(Trailers-Only)",
+              chand, calld);
+    }
+    retry_state->recv_initial_metadata_ready_deferred_batch = batch_data;
+    retry_state->recv_initial_metadata_error = GRPC_ERROR_REF(error);
+    if (!retry_state->started_recv_trailing_metadata) {
+      // recv_trailing_metadata not yet started by application; start it
+      // ourselves to get status.
+      calld->StartInternalRecvTrailingMetadata(elem);
+    } else {
+      GRPC_CALL_COMBINER_STOP(
+          calld->call_combiner_,
+          "recv_initial_metadata_ready trailers-only or error");
+    }
+    return;
+  }
+  // Received valid initial metadata, so commit the call.
+  calld->RetryCommit(elem, retry_state);
+  // Invoke the callback to return the result to the surface.
+  // Manually invoking a callback function; it does not take ownership of error.
+  calld->InvokeRecvInitialMetadataCallback(batch_data, error);
+}
+
+//
+// recv_message callback handling
+//
+
+void CallData::InvokeRecvMessageCallback(void* arg, grpc_error* error) {
+  SubchannelCallBatchData* batch_data =
+      static_cast<SubchannelCallBatchData*>(arg);
+  CallData* calld = static_cast<CallData*>(batch_data->elem->call_data);
+  // Find pending op.
+  PendingBatch* pending = calld->PendingBatchFind(
+      batch_data->elem, "invoking recv_message_ready for",
+      [](grpc_transport_stream_op_batch* batch) {
+        return batch->recv_message &&
+               batch->payload->recv_message.recv_message_ready != nullptr;
+      });
+  GPR_ASSERT(pending != nullptr);
+  // Return payload.
+  SubchannelCallRetryState* retry_state =
+      static_cast<SubchannelCallRetryState*>(
+          batch_data->subchannel_call->GetParentData());
+  *pending->batch->payload->recv_message.recv_message =
+      std::move(retry_state->recv_message);
+  // Update bookkeeping.
+  // Note: Need to do this before invoking the callback, since invoking
+  // the callback will result in yielding the call combiner.
+  grpc_closure* recv_message_ready =
+      pending->batch->payload->recv_message.recv_message_ready;
+  pending->batch->payload->recv_message.recv_message_ready = nullptr;
+  calld->MaybeClearPendingBatch(batch_data->elem, pending);
+  batch_data->Unref();
+  // Invoke callback.
+  GRPC_CLOSURE_RUN(recv_message_ready, GRPC_ERROR_REF(error));
+}
+
+void CallData::RecvMessageReady(void* arg, grpc_error* error) {
+  SubchannelCallBatchData* batch_data =
+      static_cast<SubchannelCallBatchData*>(arg);
+  grpc_call_element* elem = batch_data->elem;
+  ChannelData* chand = static_cast<ChannelData*>(elem->channel_data);
+  CallData* calld = static_cast<CallData*>(elem->call_data);
+  if (GRPC_TRACE_FLAG_ENABLED(grpc_client_channel_call_trace)) {
+    gpr_log(GPR_INFO, "chand=%p calld=%p: got recv_message_ready, error=%s",
+            chand, calld, grpc_error_string(error));
+  }
+  SubchannelCallRetryState* retry_state =
+      static_cast<SubchannelCallRetryState*>(
+          batch_data->subchannel_call->GetParentData());
+  ++retry_state->completed_recv_message_count;
+  // If a retry was already dispatched, then we're not going to use the
+  // result of this recv_message op, so do nothing.
+  if (retry_state->retry_dispatched) {
+    GRPC_CALL_COMBINER_STOP(calld->call_combiner_,
+                            "recv_message_ready after retry dispatched");
+    return;
+  }
+  // If we got an error or the payload was nullptr and we have not yet gotten
+  // the recv_trailing_metadata_ready callback, then defer propagating this
+  // callback back to the surface.  We can evaluate whether to retry when
+  // recv_trailing_metadata comes back.
+  if (GPR_UNLIKELY(
+          (retry_state->recv_message == nullptr || error != GRPC_ERROR_NONE) &&
+          !retry_state->completed_recv_trailing_metadata)) {
+    if (GRPC_TRACE_FLAG_ENABLED(grpc_client_channel_call_trace)) {
+      gpr_log(GPR_INFO,
+              "chand=%p calld=%p: deferring recv_message_ready (nullptr "
+              "message and recv_trailing_metadata pending)",
+              chand, calld);
+    }
+    retry_state->recv_message_ready_deferred_batch = batch_data;
+    retry_state->recv_message_error = GRPC_ERROR_REF(error);
+    if (!retry_state->started_recv_trailing_metadata) {
+      // recv_trailing_metadata not yet started by application; start it
+      // ourselves to get status.
+      calld->StartInternalRecvTrailingMetadata(elem);
+    } else {
+      GRPC_CALL_COMBINER_STOP(calld->call_combiner_, "recv_message_ready null");
+    }
+    return;
+  }
+  // Received a valid message, so commit the call.
+  calld->RetryCommit(elem, retry_state);
+  // Invoke the callback to return the result to the surface.
+  // Manually invoking a callback function; it does not take ownership of error.
+  calld->InvokeRecvMessageCallback(batch_data, error);
+}
+
+//
+// recv_trailing_metadata handling
+//
+
+void CallData::GetCallStatus(grpc_call_element* elem,
+                             grpc_metadata_batch* md_batch, grpc_error* error,
+                             grpc_status_code* status,
+                             grpc_mdelem** server_pushback_md) {
+  if (error != GRPC_ERROR_NONE) {
+    grpc_error_get_status(error, deadline_, status, nullptr, nullptr, nullptr);
+  } else {
+    GPR_ASSERT(md_batch->idx.named.grpc_status != nullptr);
+    *status =
+        grpc_get_status_code_from_metadata(md_batch->idx.named.grpc_status->md);
+    if (server_pushback_md != nullptr &&
+        md_batch->idx.named.grpc_retry_pushback_ms != nullptr) {
+      *server_pushback_md = &md_batch->idx.named.grpc_retry_pushback_ms->md;
+    }
+  }
+  GRPC_ERROR_UNREF(error);
+}
+
+void CallData::AddClosureForRecvTrailingMetadataReady(
+    grpc_call_element* elem, SubchannelCallBatchData* batch_data,
+    grpc_error* error, CallCombinerClosureList* closures) {
+  // Find pending batch.
+  PendingBatch* pending = PendingBatchFind(
+      elem, "invoking recv_trailing_metadata for",
+      [](grpc_transport_stream_op_batch* batch) {
+        return batch->recv_trailing_metadata &&
+               batch->payload->recv_trailing_metadata
+                       .recv_trailing_metadata_ready != nullptr;
+      });
+  // If we generated the recv_trailing_metadata op internally via
+  // StartInternalRecvTrailingMetadata(), then there will be no pending batch.
+  if (pending == nullptr) {
+    GRPC_ERROR_UNREF(error);
+    return;
+  }
+  // Return metadata.
+  SubchannelCallRetryState* retry_state =
+      static_cast<SubchannelCallRetryState*>(
+          batch_data->subchannel_call->GetParentData());
+  grpc_metadata_batch_move(
+      &retry_state->recv_trailing_metadata,
+      pending->batch->payload->recv_trailing_metadata.recv_trailing_metadata);
+  // Add closure.
+  closures->Add(pending->batch->payload->recv_trailing_metadata
+                    .recv_trailing_metadata_ready,
+                error, "recv_trailing_metadata_ready for pending batch");
+  // Update bookkeeping.
+  pending->batch->payload->recv_trailing_metadata.recv_trailing_metadata_ready =
+      nullptr;
+  MaybeClearPendingBatch(elem, pending);
+}
+
+void CallData::AddClosuresForDeferredRecvCallbacks(
+    SubchannelCallBatchData* batch_data, SubchannelCallRetryState* retry_state,
+    CallCombinerClosureList* closures) {
+  if (batch_data->batch.recv_trailing_metadata) {
+    // Add closure for deferred recv_initial_metadata_ready.
+    if (GPR_UNLIKELY(retry_state->recv_initial_metadata_ready_deferred_batch !=
+                     nullptr)) {
+      GRPC_CLOSURE_INIT(&retry_state->recv_initial_metadata_ready,
+                        InvokeRecvInitialMetadataCallback,
+                        retry_state->recv_initial_metadata_ready_deferred_batch,
+                        grpc_schedule_on_exec_ctx);
+      closures->Add(&retry_state->recv_initial_metadata_ready,
+                    retry_state->recv_initial_metadata_error,
+                    "resuming recv_initial_metadata_ready");
+      retry_state->recv_initial_metadata_ready_deferred_batch = nullptr;
+    }
+    // Add closure for deferred recv_message_ready.
+    if (GPR_UNLIKELY(retry_state->recv_message_ready_deferred_batch !=
+                     nullptr)) {
+      GRPC_CLOSURE_INIT(&retry_state->recv_message_ready,
+                        InvokeRecvMessageCallback,
+                        retry_state->recv_message_ready_deferred_batch,
+                        grpc_schedule_on_exec_ctx);
+      closures->Add(&retry_state->recv_message_ready,
+                    retry_state->recv_message_error,
+                    "resuming recv_message_ready");
+      retry_state->recv_message_ready_deferred_batch = nullptr;
+    }
+  }
+}
+
+bool CallData::PendingBatchIsUnstarted(PendingBatch* pending,
+                                       SubchannelCallRetryState* retry_state) {
+  if (pending->batch == nullptr || pending->batch->on_complete == nullptr) {
+    return false;
+  }
+  if (pending->batch->send_initial_metadata &&
+      !retry_state->started_send_initial_metadata) {
+    return true;
+  }
+  if (pending->batch->send_message &&
+      retry_state->started_send_message_count < send_messages_.size()) {
+    return true;
+  }
+  if (pending->batch->send_trailing_metadata &&
+      !retry_state->started_send_trailing_metadata) {
+    return true;
+  }
+  return false;
+}
+
+void CallData::AddClosuresToFailUnstartedPendingBatches(
+    grpc_call_element* elem, SubchannelCallRetryState* retry_state,
+    grpc_error* error, CallCombinerClosureList* closures) {
+  ChannelData* chand = static_cast<ChannelData*>(elem->channel_data);
+  for (size_t i = 0; i < GPR_ARRAY_SIZE(pending_batches_); ++i) {
+    PendingBatch* pending = &pending_batches_[i];
+    if (PendingBatchIsUnstarted(pending, retry_state)) {
+      if (GRPC_TRACE_FLAG_ENABLED(grpc_client_channel_call_trace)) {
+        gpr_log(GPR_INFO,
+                "chand=%p calld=%p: failing unstarted pending batch at index "
+                "%" PRIuPTR,
+                chand, this, i);
+      }
+      closures->Add(pending->batch->on_complete, GRPC_ERROR_REF(error),
+                    "failing on_complete for pending batch");
+      pending->batch->on_complete = nullptr;
+      MaybeClearPendingBatch(elem, pending);
+    }
+  }
+  GRPC_ERROR_UNREF(error);
+}
+
+void CallData::RunClosuresForCompletedCall(SubchannelCallBatchData* batch_data,
+                                           grpc_error* error) {
+  grpc_call_element* elem = batch_data->elem;
+  SubchannelCallRetryState* retry_state =
+      static_cast<SubchannelCallRetryState*>(
+          batch_data->subchannel_call->GetParentData());
+  // Construct list of closures to execute.
+  CallCombinerClosureList closures;
+  // First, add closure for recv_trailing_metadata_ready.
+  AddClosureForRecvTrailingMetadataReady(elem, batch_data,
+                                         GRPC_ERROR_REF(error), &closures);
+  // If there are deferred recv_initial_metadata_ready or recv_message_ready
+  // callbacks, add them to closures.
+  AddClosuresForDeferredRecvCallbacks(batch_data, retry_state, &closures);
+  // Add closures to fail any pending batches that have not yet been started.
+  AddClosuresToFailUnstartedPendingBatches(elem, retry_state,
+                                           GRPC_ERROR_REF(error), &closures);
+  // Don't need batch_data anymore.
+  batch_data->Unref();
+  // Schedule all of the closures identified above.
+  // Note: This will release the call combiner.
+  closures.RunClosures(call_combiner_);
+  GRPC_ERROR_UNREF(error);
+}
+
+void CallData::RecvTrailingMetadataReady(void* arg, grpc_error* error) {
+  SubchannelCallBatchData* batch_data =
+      static_cast<SubchannelCallBatchData*>(arg);
+  grpc_call_element* elem = batch_data->elem;
+  ChannelData* chand = static_cast<ChannelData*>(elem->channel_data);
+  CallData* calld = static_cast<CallData*>(elem->call_data);
+  if (GRPC_TRACE_FLAG_ENABLED(grpc_client_channel_call_trace)) {
+    gpr_log(GPR_INFO,
+            "chand=%p calld=%p: got recv_trailing_metadata_ready, error=%s",
+            chand, calld, grpc_error_string(error));
+  }
+  SubchannelCallRetryState* retry_state =
+      static_cast<SubchannelCallRetryState*>(
+          batch_data->subchannel_call->GetParentData());
+  retry_state->completed_recv_trailing_metadata = true;
+  // Get the call's status and check for server pushback metadata.
+  grpc_status_code status = GRPC_STATUS_OK;
+  grpc_mdelem* server_pushback_md = nullptr;
+  grpc_metadata_batch* md_batch =
+      batch_data->batch.payload->recv_trailing_metadata.recv_trailing_metadata;
+  calld->GetCallStatus(elem, md_batch, GRPC_ERROR_REF(error), &status,
+                       &server_pushback_md);
+  if (GRPC_TRACE_FLAG_ENABLED(grpc_client_channel_call_trace)) {
+    gpr_log(GPR_INFO, "chand=%p calld=%p: call finished, status=%s", chand,
+            calld, grpc_status_code_to_string(status));
+  }
+  // Check if we should retry.
+  if (calld->MaybeRetry(elem, batch_data, status, server_pushback_md)) {
+    // Unref batch_data for deferred recv_initial_metadata_ready or
+    // recv_message_ready callbacks, if any.
+    if (retry_state->recv_initial_metadata_ready_deferred_batch != nullptr) {
+      batch_data->Unref();
+      GRPC_ERROR_UNREF(retry_state->recv_initial_metadata_error);
+    }
+    if (retry_state->recv_message_ready_deferred_batch != nullptr) {
+      batch_data->Unref();
+      GRPC_ERROR_UNREF(retry_state->recv_message_error);
+    }
+    batch_data->Unref();
+    return;
+  }
+  // Not retrying, so commit the call.
+  calld->RetryCommit(elem, retry_state);
+  // Run any necessary closures.
+  calld->RunClosuresForCompletedCall(batch_data, GRPC_ERROR_REF(error));
+}
+
+//
+// on_complete callback handling
+//
+
+void CallData::AddClosuresForCompletedPendingBatch(
+    grpc_call_element* elem, SubchannelCallBatchData* batch_data,
+    SubchannelCallRetryState* retry_state, grpc_error* error,
+    CallCombinerClosureList* closures) {
+  PendingBatch* pending = PendingBatchFind(
+      elem, "completed", [batch_data](grpc_transport_stream_op_batch* batch) {
+        // Match the pending batch with the same set of send ops as the
+        // subchannel batch we've just completed.
+        return batch->on_complete != nullptr &&
+               batch_data->batch.send_initial_metadata ==
+                   batch->send_initial_metadata &&
+               batch_data->batch.send_message == batch->send_message &&
+               batch_data->batch.send_trailing_metadata ==
+                   batch->send_trailing_metadata;
+      });
+  // If batch_data is a replay batch, then there will be no pending
+  // batch to complete.
+  if (pending == nullptr) {
+    GRPC_ERROR_UNREF(error);
+    return;
+  }
+  // Add closure.
+  closures->Add(pending->batch->on_complete, error,
+                "on_complete for pending batch");
+  pending->batch->on_complete = nullptr;
+  MaybeClearPendingBatch(elem, pending);
+}
+
+void CallData::AddClosuresForReplayOrPendingSendOps(
+    grpc_call_element* elem, SubchannelCallBatchData* batch_data,
+    SubchannelCallRetryState* retry_state, CallCombinerClosureList* closures) {
+  ChannelData* chand = static_cast<ChannelData*>(elem->channel_data);
+  bool have_pending_send_message_ops =
+      retry_state->started_send_message_count < send_messages_.size();
+  bool have_pending_send_trailing_metadata_op =
+      seen_send_trailing_metadata_ &&
+      !retry_state->started_send_trailing_metadata;
+  if (!have_pending_send_message_ops &&
+      !have_pending_send_trailing_metadata_op) {
+    for (size_t i = 0; i < GPR_ARRAY_SIZE(pending_batches_); ++i) {
+      PendingBatch* pending = &pending_batches_[i];
+      grpc_transport_stream_op_batch* batch = pending->batch;
+      if (batch == nullptr || pending->send_ops_cached) continue;
+      if (batch->send_message) have_pending_send_message_ops = true;
+      if (batch->send_trailing_metadata) {
+        have_pending_send_trailing_metadata_op = true;
+      }
+    }
+  }
+  if (have_pending_send_message_ops || have_pending_send_trailing_metadata_op) {
+    if (GRPC_TRACE_FLAG_ENABLED(grpc_client_channel_call_trace)) {
+      gpr_log(GPR_INFO,
+              "chand=%p calld=%p: starting next batch for pending send op(s)",
+              chand, this);
+    }
+    GRPC_CLOSURE_INIT(&batch_data->batch.handler_private.closure,
+                      StartRetriableSubchannelBatches, elem,
+                      grpc_schedule_on_exec_ctx);
+    closures->Add(&batch_data->batch.handler_private.closure, GRPC_ERROR_NONE,
+                  "starting next batch for send_* op(s)");
+  }
+}
+
+void CallData::OnComplete(void* arg, grpc_error* error) {
+  SubchannelCallBatchData* batch_data =
+      static_cast<SubchannelCallBatchData*>(arg);
+  grpc_call_element* elem = batch_data->elem;
+  ChannelData* chand = static_cast<ChannelData*>(elem->channel_data);
+  CallData* calld = static_cast<CallData*>(elem->call_data);
+  if (GRPC_TRACE_FLAG_ENABLED(grpc_client_channel_call_trace)) {
+    char* batch_str = grpc_transport_stream_op_batch_string(&batch_data->batch);
+    gpr_log(GPR_INFO, "chand=%p calld=%p: got on_complete, error=%s, batch=%s",
+            chand, calld, grpc_error_string(error), batch_str);
+    gpr_free(batch_str);
+  }
+  SubchannelCallRetryState* retry_state =
+      static_cast<SubchannelCallRetryState*>(
+          batch_data->subchannel_call->GetParentData());
+  // Update bookkeeping in retry_state.
+  if (batch_data->batch.send_initial_metadata) {
+    retry_state->completed_send_initial_metadata = true;
+  }
+  if (batch_data->batch.send_message) {
+    ++retry_state->completed_send_message_count;
+  }
+  if (batch_data->batch.send_trailing_metadata) {
+    retry_state->completed_send_trailing_metadata = true;
+  }
+  // If the call is committed, free cached data for send ops that we've just
+  // completed.
+  if (calld->retry_committed_) {
+    calld->FreeCachedSendOpDataForCompletedBatch(elem, batch_data, retry_state);
+  }
+  // Construct list of closures to execute.
+  CallCombinerClosureList closures;
+  // If a retry was already dispatched, that means we saw
+  // recv_trailing_metadata before this, so we do nothing here.
+  // Otherwise, invoke the callback to return the result to the surface.
+  if (!retry_state->retry_dispatched) {
+    // Add closure for the completed pending batch, if any.
+    calld->AddClosuresForCompletedPendingBatch(
+        elem, batch_data, retry_state, GRPC_ERROR_REF(error), &closures);
+    // If needed, add a callback to start any replay or pending send ops on
+    // the subchannel call.
+    if (!retry_state->completed_recv_trailing_metadata) {
+      calld->AddClosuresForReplayOrPendingSendOps(elem, batch_data, retry_state,
+                                                  &closures);
+    }
+  }
+  // Track number of pending subchannel send batches and determine if this
+  // was the last one.
+  --calld->num_pending_retriable_subchannel_send_batches_;
+  const bool last_send_batch_complete =
+      calld->num_pending_retriable_subchannel_send_batches_ == 0;
+  // Don't need batch_data anymore.
+  batch_data->Unref();
+  // Schedule all of the closures identified above.
+  // Note: This yeilds the call combiner.
+  closures.RunClosures(calld->call_combiner_);
+  // If this was the last subchannel send batch, unref the call stack.
+  if (last_send_batch_complete) {
+    GRPC_CALL_STACK_UNREF(calld->owning_call_, "subchannel_send_batches");
+  }
+}
+
+//
+// subchannel batch construction
+//
+
+void CallData::StartBatchInCallCombiner(void* arg, grpc_error* ignored) {
+  grpc_transport_stream_op_batch* batch =
+      static_cast<grpc_transport_stream_op_batch*>(arg);
+  SubchannelCall* subchannel_call =
+      static_cast<SubchannelCall*>(batch->handler_private.extra_arg);
+  // Note: This will release the call combiner.
+  subchannel_call->StartTransportStreamOpBatch(batch);
+}
+
+void CallData::AddClosureForSubchannelBatch(
+    grpc_call_element* elem, grpc_transport_stream_op_batch* batch,
+    CallCombinerClosureList* closures) {
+  ChannelData* chand = static_cast<ChannelData*>(elem->channel_data);
+  batch->handler_private.extra_arg = subchannel_call_.get();
+  GRPC_CLOSURE_INIT(&batch->handler_private.closure, StartBatchInCallCombiner,
+                    batch, grpc_schedule_on_exec_ctx);
+  if (GRPC_TRACE_FLAG_ENABLED(grpc_client_channel_call_trace)) {
+    char* batch_str = grpc_transport_stream_op_batch_string(batch);
+    gpr_log(GPR_INFO, "chand=%p calld=%p: starting subchannel batch: %s", chand,
+            this, batch_str);
+    gpr_free(batch_str);
+  }
+  closures->Add(&batch->handler_private.closure, GRPC_ERROR_NONE,
+                "start_subchannel_batch");
+}
+
+void CallData::AddRetriableSendInitialMetadataOp(
+    SubchannelCallRetryState* retry_state,
+    SubchannelCallBatchData* batch_data) {
+  // Maps the number of retries to the corresponding metadata value slice.
+  const grpc_slice* retry_count_strings[] = {&GRPC_MDSTR_1, &GRPC_MDSTR_2,
+                                             &GRPC_MDSTR_3, &GRPC_MDSTR_4};
+  // We need to make a copy of the metadata batch for each attempt, since
+  // the filters in the subchannel stack may modify this batch, and we don't
+  // want those modifications to be passed forward to subsequent attempts.
+  //
+  // If we've already completed one or more attempts, add the
+  // grpc-retry-attempts header.
+  retry_state->send_initial_metadata_storage =
+      static_cast<grpc_linked_mdelem*>(arena_->Alloc(
+          sizeof(grpc_linked_mdelem) *
+          (send_initial_metadata_.list.count + (num_attempts_completed_ > 0))));
+  grpc_metadata_batch_copy(&send_initial_metadata_,
+                           &retry_state->send_initial_metadata,
+                           retry_state->send_initial_metadata_storage);
+  if (GPR_UNLIKELY(retry_state->send_initial_metadata.idx.named
+                       .grpc_previous_rpc_attempts != nullptr)) {
+    grpc_metadata_batch_remove(&retry_state->send_initial_metadata,
+                               GRPC_BATCH_GRPC_PREVIOUS_RPC_ATTEMPTS);
+  }
+  if (GPR_UNLIKELY(num_attempts_completed_ > 0)) {
+    grpc_mdelem retry_md = grpc_mdelem_create(
+        GRPC_MDSTR_GRPC_PREVIOUS_RPC_ATTEMPTS,
+        *retry_count_strings[num_attempts_completed_ - 1], nullptr);
+    grpc_error* error = grpc_metadata_batch_add_tail(
+        &retry_state->send_initial_metadata,
+        &retry_state
+             ->send_initial_metadata_storage[send_initial_metadata_.list.count],
+        retry_md, GRPC_BATCH_GRPC_PREVIOUS_RPC_ATTEMPTS);
+    if (GPR_UNLIKELY(error != GRPC_ERROR_NONE)) {
+      gpr_log(GPR_ERROR, "error adding retry metadata: %s",
+              grpc_error_string(error));
+      GPR_ASSERT(false);
+    }
+  }
+  retry_state->started_send_initial_metadata = true;
+  batch_data->batch.send_initial_metadata = true;
+  batch_data->batch.payload->send_initial_metadata.send_initial_metadata =
+      &retry_state->send_initial_metadata;
+  batch_data->batch.payload->send_initial_metadata.send_initial_metadata_flags =
+      send_initial_metadata_flags_;
+  batch_data->batch.payload->send_initial_metadata.peer_string = peer_string_;
+}
+
+void CallData::AddRetriableSendMessageOp(grpc_call_element* elem,
+                                         SubchannelCallRetryState* retry_state,
+                                         SubchannelCallBatchData* batch_data) {
+  ChannelData* chand = static_cast<ChannelData*>(elem->channel_data);
+  if (GRPC_TRACE_FLAG_ENABLED(grpc_client_channel_call_trace)) {
+    gpr_log(GPR_INFO,
+            "chand=%p calld=%p: starting calld->send_messages[%" PRIuPTR "]",
+            chand, this, retry_state->started_send_message_count);
+  }
+  ByteStreamCache* cache =
+      send_messages_[retry_state->started_send_message_count];
+  ++retry_state->started_send_message_count;
+  retry_state->send_message.Init(cache);
+  batch_data->batch.send_message = true;
+  batch_data->batch.payload->send_message.send_message.reset(
+      retry_state->send_message.get());
+}
+
+void CallData::AddRetriableSendTrailingMetadataOp(
+    SubchannelCallRetryState* retry_state,
+    SubchannelCallBatchData* batch_data) {
+  // We need to make a copy of the metadata batch for each attempt, since
+  // the filters in the subchannel stack may modify this batch, and we don't
+  // want those modifications to be passed forward to subsequent attempts.
+  retry_state->send_trailing_metadata_storage =
+      static_cast<grpc_linked_mdelem*>(arena_->Alloc(
+          sizeof(grpc_linked_mdelem) * send_trailing_metadata_.list.count));
+  grpc_metadata_batch_copy(&send_trailing_metadata_,
+                           &retry_state->send_trailing_metadata,
+                           retry_state->send_trailing_metadata_storage);
+  retry_state->started_send_trailing_metadata = true;
+  batch_data->batch.send_trailing_metadata = true;
+  batch_data->batch.payload->send_trailing_metadata.send_trailing_metadata =
+      &retry_state->send_trailing_metadata;
+}
+
+void CallData::AddRetriableRecvInitialMetadataOp(
+    SubchannelCallRetryState* retry_state,
+    SubchannelCallBatchData* batch_data) {
+  retry_state->started_recv_initial_metadata = true;
+  batch_data->batch.recv_initial_metadata = true;
+  grpc_metadata_batch_init(&retry_state->recv_initial_metadata);
+  batch_data->batch.payload->recv_initial_metadata.recv_initial_metadata =
+      &retry_state->recv_initial_metadata;
+  batch_data->batch.payload->recv_initial_metadata.trailing_metadata_available =
+      &retry_state->trailing_metadata_available;
+  GRPC_CLOSURE_INIT(&retry_state->recv_initial_metadata_ready,
+                    RecvInitialMetadataReady, batch_data,
+                    grpc_schedule_on_exec_ctx);
+  batch_data->batch.payload->recv_initial_metadata.recv_initial_metadata_ready =
+      &retry_state->recv_initial_metadata_ready;
+}
+
+void CallData::AddRetriableRecvMessageOp(SubchannelCallRetryState* retry_state,
+                                         SubchannelCallBatchData* batch_data) {
+  ++retry_state->started_recv_message_count;
+  batch_data->batch.recv_message = true;
+  batch_data->batch.payload->recv_message.recv_message =
+      &retry_state->recv_message;
+  GRPC_CLOSURE_INIT(&retry_state->recv_message_ready, RecvMessageReady,
+                    batch_data, grpc_schedule_on_exec_ctx);
+  batch_data->batch.payload->recv_message.recv_message_ready =
+      &retry_state->recv_message_ready;
+}
+
+void CallData::AddRetriableRecvTrailingMetadataOp(
+    SubchannelCallRetryState* retry_state,
+    SubchannelCallBatchData* batch_data) {
+  retry_state->started_recv_trailing_metadata = true;
+  batch_data->batch.recv_trailing_metadata = true;
+  grpc_metadata_batch_init(&retry_state->recv_trailing_metadata);
+  batch_data->batch.payload->recv_trailing_metadata.recv_trailing_metadata =
+      &retry_state->recv_trailing_metadata;
+  batch_data->batch.payload->recv_trailing_metadata.collect_stats =
+      &retry_state->collect_stats;
+  GRPC_CLOSURE_INIT(&retry_state->recv_trailing_metadata_ready,
+                    RecvTrailingMetadataReady, batch_data,
+                    grpc_schedule_on_exec_ctx);
+  batch_data->batch.payload->recv_trailing_metadata
+      .recv_trailing_metadata_ready =
+      &retry_state->recv_trailing_metadata_ready;
+  MaybeInjectRecvTrailingMetadataReadyForLoadBalancingPolicy(
+      &batch_data->batch);
+}
+
+void CallData::StartInternalRecvTrailingMetadata(grpc_call_element* elem) {
+  ChannelData* chand = static_cast<ChannelData*>(elem->channel_data);
+  if (GRPC_TRACE_FLAG_ENABLED(grpc_client_channel_call_trace)) {
+    gpr_log(GPR_INFO,
+            "chand=%p calld=%p: call failed but recv_trailing_metadata not "
+            "started; starting it internally",
+            chand, this);
+  }
+  SubchannelCallRetryState* retry_state =
+      static_cast<SubchannelCallRetryState*>(subchannel_call_->GetParentData());
+  // Create batch_data with 2 refs, since this batch will be unreffed twice:
+  // once for the recv_trailing_metadata_ready callback when the subchannel
+  // batch returns, and again when we actually get a recv_trailing_metadata
+  // op from the surface.
+  SubchannelCallBatchData* batch_data =
+      SubchannelCallBatchData::Create(elem, 2, false /* set_on_complete */);
+  AddRetriableRecvTrailingMetadataOp(retry_state, batch_data);
+  retry_state->recv_trailing_metadata_internal_batch = batch_data;
+  // Note: This will release the call combiner.
+  subchannel_call_->StartTransportStreamOpBatch(&batch_data->batch);
+}
+
+// If there are any cached send ops that need to be replayed on the
+// current subchannel call, creates and returns a new subchannel batch
+// to replay those ops.  Otherwise, returns nullptr.
+CallData::SubchannelCallBatchData*
+CallData::MaybeCreateSubchannelBatchForReplay(
+    grpc_call_element* elem, SubchannelCallRetryState* retry_state) {
+  ChannelData* chand = static_cast<ChannelData*>(elem->channel_data);
+  SubchannelCallBatchData* replay_batch_data = nullptr;
+  // send_initial_metadata.
+  if (seen_send_initial_metadata_ &&
+      !retry_state->started_send_initial_metadata &&
+      !pending_send_initial_metadata_) {
+    if (GRPC_TRACE_FLAG_ENABLED(grpc_client_channel_call_trace)) {
+      gpr_log(GPR_INFO,
+              "chand=%p calld=%p: replaying previously completed "
+              "send_initial_metadata op",
+              chand, this);
+    }
+    replay_batch_data =
+        SubchannelCallBatchData::Create(elem, 1, true /* set_on_complete */);
+    AddRetriableSendInitialMetadataOp(retry_state, replay_batch_data);
+  }
+  // send_message.
+  // Note that we can only have one send_message op in flight at a time.
+  if (retry_state->started_send_message_count < send_messages_.size() &&
+      retry_state->started_send_message_count ==
+          retry_state->completed_send_message_count &&
+      !pending_send_message_) {
+    if (GRPC_TRACE_FLAG_ENABLED(grpc_client_channel_call_trace)) {
+      gpr_log(GPR_INFO,
+              "chand=%p calld=%p: replaying previously completed "
+              "send_message op",
+              chand, this);
+    }
+    if (replay_batch_data == nullptr) {
+      replay_batch_data =
+          SubchannelCallBatchData::Create(elem, 1, true /* set_on_complete */);
+    }
+    AddRetriableSendMessageOp(elem, retry_state, replay_batch_data);
+  }
+  // send_trailing_metadata.
+  // Note that we only add this op if we have no more send_message ops
+  // to start, since we can't send down any more send_message ops after
+  // send_trailing_metadata.
+  if (seen_send_trailing_metadata_ &&
+      retry_state->started_send_message_count == send_messages_.size() &&
+      !retry_state->started_send_trailing_metadata &&
+      !pending_send_trailing_metadata_) {
+    if (GRPC_TRACE_FLAG_ENABLED(grpc_client_channel_call_trace)) {
+      gpr_log(GPR_INFO,
+              "chand=%p calld=%p: replaying previously completed "
+              "send_trailing_metadata op",
+              chand, this);
+    }
+    if (replay_batch_data == nullptr) {
+      replay_batch_data =
+          SubchannelCallBatchData::Create(elem, 1, true /* set_on_complete */);
+    }
+    AddRetriableSendTrailingMetadataOp(retry_state, replay_batch_data);
+  }
+  return replay_batch_data;
+}
+
+void CallData::AddSubchannelBatchesForPendingBatches(
+    grpc_call_element* elem, SubchannelCallRetryState* retry_state,
+    CallCombinerClosureList* closures) {
+  for (size_t i = 0; i < GPR_ARRAY_SIZE(pending_batches_); ++i) {
+    PendingBatch* pending = &pending_batches_[i];
+    grpc_transport_stream_op_batch* batch = pending->batch;
+    if (batch == nullptr) continue;
+    // Skip any batch that either (a) has already been started on this
+    // subchannel call or (b) we can't start yet because we're still
+    // replaying send ops that need to be completed first.
+    // TODO(roth): Note that if any one op in the batch can't be sent
+    // yet due to ops that we're replaying, we don't start any of the ops
+    // in the batch.  This is probably okay, but it could conceivably
+    // lead to increased latency in some cases -- e.g., we could delay
+    // starting a recv op due to it being in the same batch with a send
+    // op.  If/when we revamp the callback protocol in
+    // transport_stream_op_batch, we may be able to fix this.
+    if (batch->send_initial_metadata &&
+        retry_state->started_send_initial_metadata) {
+      continue;
+    }
+    if (batch->send_message && retry_state->completed_send_message_count <
+                                   retry_state->started_send_message_count) {
+      continue;
+    }
+    // Note that we only start send_trailing_metadata if we have no more
+    // send_message ops to start, since we can't send down any more
+    // send_message ops after send_trailing_metadata.
+    if (batch->send_trailing_metadata &&
+        (retry_state->started_send_message_count + batch->send_message <
+             send_messages_.size() ||
+         retry_state->started_send_trailing_metadata)) {
+      continue;
+    }
+    if (batch->recv_initial_metadata &&
+        retry_state->started_recv_initial_metadata) {
+      continue;
+    }
+    if (batch->recv_message && retry_state->completed_recv_message_count <
+                                   retry_state->started_recv_message_count) {
+      continue;
+    }
+    if (batch->recv_trailing_metadata &&
+        retry_state->started_recv_trailing_metadata) {
+      // If we previously completed a recv_trailing_metadata op
+      // initiated by StartInternalRecvTrailingMetadata(), use the
+      // result of that instead of trying to re-start this op.
+      if (GPR_UNLIKELY((retry_state->recv_trailing_metadata_internal_batch !=
+                        nullptr))) {
+        // If the batch completed, then trigger the completion callback
+        // directly, so that we return the previously returned results to
+        // the application.  Otherwise, just unref the internally
+        // started subchannel batch, since we'll propagate the
+        // completion when it completes.
+        if (retry_state->completed_recv_trailing_metadata) {
+          // Batches containing recv_trailing_metadata always succeed.
+          closures->Add(
+              &retry_state->recv_trailing_metadata_ready, GRPC_ERROR_NONE,
+              "re-executing recv_trailing_metadata_ready to propagate "
+              "internally triggered result");
+        } else {
+          retry_state->recv_trailing_metadata_internal_batch->Unref();
+        }
+        retry_state->recv_trailing_metadata_internal_batch = nullptr;
+      }
+      continue;
+    }
+    // If we're not retrying, just send the batch as-is.
+    if (method_params_ == nullptr ||
+        method_params_->retry_policy() == nullptr || retry_committed_) {
+      // TODO(roth) : We should probably call
+      // MaybeInjectRecvTrailingMetadataReadyForLoadBalancingPolicy here.
+      AddClosureForSubchannelBatch(elem, batch, closures);
+      PendingBatchClear(pending);
+      continue;
+    }
+    // Create batch with the right number of callbacks.
+    const bool has_send_ops = batch->send_initial_metadata ||
+                              batch->send_message ||
+                              batch->send_trailing_metadata;
+    const int num_callbacks = has_send_ops + batch->recv_initial_metadata +
+                              batch->recv_message +
+                              batch->recv_trailing_metadata;
+    SubchannelCallBatchData* batch_data = SubchannelCallBatchData::Create(
+        elem, num_callbacks, has_send_ops /* set_on_complete */);
+    // Cache send ops if needed.
+    MaybeCacheSendOpsForBatch(pending);
+    // send_initial_metadata.
+    if (batch->send_initial_metadata) {
+      AddRetriableSendInitialMetadataOp(retry_state, batch_data);
+    }
+    // send_message.
+    if (batch->send_message) {
+      AddRetriableSendMessageOp(elem, retry_state, batch_data);
+    }
+    // send_trailing_metadata.
+    if (batch->send_trailing_metadata) {
+      AddRetriableSendTrailingMetadataOp(retry_state, batch_data);
+    }
+    // recv_initial_metadata.
+    if (batch->recv_initial_metadata) {
+      // recv_flags is only used on the server side.
+      GPR_ASSERT(batch->payload->recv_initial_metadata.recv_flags == nullptr);
+      AddRetriableRecvInitialMetadataOp(retry_state, batch_data);
+    }
+    // recv_message.
+    if (batch->recv_message) {
+      AddRetriableRecvMessageOp(retry_state, batch_data);
+    }
+    // recv_trailing_metadata.
+    if (batch->recv_trailing_metadata) {
+      AddRetriableRecvTrailingMetadataOp(retry_state, batch_data);
+    }
+    AddClosureForSubchannelBatch(elem, &batch_data->batch, closures);
+    // Track number of pending subchannel send batches.
+    // If this is the first one, take a ref to the call stack.
+    if (batch->send_initial_metadata || batch->send_message ||
+        batch->send_trailing_metadata) {
+      if (num_pending_retriable_subchannel_send_batches_ == 0) {
+        GRPC_CALL_STACK_REF(owning_call_, "subchannel_send_batches");
+      }
+      ++num_pending_retriable_subchannel_send_batches_;
+    }
+  }
+}
+
+void CallData::StartRetriableSubchannelBatches(void* arg, grpc_error* ignored) {
+  grpc_call_element* elem = static_cast<grpc_call_element*>(arg);
+  ChannelData* chand = static_cast<ChannelData*>(elem->channel_data);
+  CallData* calld = static_cast<CallData*>(elem->call_data);
+  if (GRPC_TRACE_FLAG_ENABLED(grpc_client_channel_call_trace)) {
+    gpr_log(GPR_INFO, "chand=%p calld=%p: constructing retriable batches",
+            chand, calld);
+  }
+  SubchannelCallRetryState* retry_state =
+      static_cast<SubchannelCallRetryState*>(
+          calld->subchannel_call_->GetParentData());
+  // Construct list of closures to execute, one for each pending batch.
+  CallCombinerClosureList closures;
+  // Replay previously-returned send_* ops if needed.
+  SubchannelCallBatchData* replay_batch_data =
+      calld->MaybeCreateSubchannelBatchForReplay(elem, retry_state);
+  if (replay_batch_data != nullptr) {
+    calld->AddClosureForSubchannelBatch(elem, &replay_batch_data->batch,
+                                        &closures);
+    // Track number of pending subchannel send batches.
+    // If this is the first one, take a ref to the call stack.
+    if (calld->num_pending_retriable_subchannel_send_batches_ == 0) {
+      GRPC_CALL_STACK_REF(calld->owning_call_, "subchannel_send_batches");
+    }
+    ++calld->num_pending_retriable_subchannel_send_batches_;
+  }
+  // Now add pending batches.
+  calld->AddSubchannelBatchesForPendingBatches(elem, retry_state, &closures);
+  // Start batches on subchannel call.
+  if (GRPC_TRACE_FLAG_ENABLED(grpc_client_channel_call_trace)) {
+    gpr_log(GPR_INFO,
+            "chand=%p calld=%p: starting %" PRIuPTR
+            " retriable batches on subchannel_call=%p",
+            chand, calld, closures.size(), calld->subchannel_call_.get());
+  }
+  // Note: This will yield the call combiner.
+  closures.RunClosures(calld->call_combiner_);
+}
+
+//
+// LB pick
+//
+
+void CallData::CreateSubchannelCall(grpc_call_element* elem) {
+  ChannelData* chand = static_cast<ChannelData*>(elem->channel_data);
+  const size_t parent_data_size =
+      enable_retries_ ? sizeof(SubchannelCallRetryState) : 0;
+  SubchannelCall::Args call_args = {
+      std::move(connected_subchannel_), pollent_, path_, call_start_time_,
+      deadline_, arena_,
+      // TODO(roth): When we implement hedging support, we will probably
+      // need to use a separate call context for each subchannel call.
+      call_context_, call_combiner_, parent_data_size};
+  grpc_error* error = GRPC_ERROR_NONE;
+  subchannel_call_ = SubchannelCall::Create(std::move(call_args), &error);
+  if (GRPC_TRACE_FLAG_ENABLED(grpc_client_channel_routing_trace)) {
+    gpr_log(GPR_INFO, "chand=%p calld=%p: create subchannel_call=%p: error=%s",
+            chand, this, subchannel_call_.get(), grpc_error_string(error));
+  }
+  if (GPR_UNLIKELY(error != GRPC_ERROR_NONE)) {
+    PendingBatchesFail(elem, error, YieldCallCombiner);
+  } else {
+    if (parent_data_size > 0) {
+      new (subchannel_call_->GetParentData())
+          SubchannelCallRetryState(call_context_);
+    }
+    PendingBatchesResume(elem);
+  }
+}
+
+void CallData::AsyncPickDone(grpc_call_element* elem, grpc_error* error) {
+  GRPC_CLOSURE_INIT(&pick_closure_, PickDone, elem, grpc_schedule_on_exec_ctx);
+  GRPC_CLOSURE_SCHED(&pick_closure_, error);
+}
+
+void CallData::PickDone(void* arg, grpc_error* error) {
+  grpc_call_element* elem = static_cast<grpc_call_element*>(arg);
+  ChannelData* chand = static_cast<ChannelData*>(elem->channel_data);
+  CallData* calld = static_cast<CallData*>(elem->call_data);
+  if (error != GRPC_ERROR_NONE) {
+    if (GRPC_TRACE_FLAG_ENABLED(grpc_client_channel_routing_trace)) {
+      gpr_log(GPR_INFO,
+              "chand=%p calld=%p: failed to pick subchannel: error=%s", chand,
+              calld, grpc_error_string(error));
+    }
+    calld->PendingBatchesFail(elem, GRPC_ERROR_REF(error), YieldCallCombiner);
+    return;
+  }
+  calld->CreateSubchannelCall(elem);
+}
+
+// A class to handle the call combiner cancellation callback for a
+// queued pick.
+class CallData::QueuedPickCanceller {
+ public:
+  explicit QueuedPickCanceller(grpc_call_element* elem) : elem_(elem) {
+    auto* calld = static_cast<CallData*>(elem->call_data);
+    GRPC_CALL_STACK_REF(calld->owning_call_, "QueuedPickCanceller");
+    GRPC_CLOSURE_INIT(&closure_, &CancelLocked, this,
+                      grpc_schedule_on_exec_ctx);
+    calld->call_combiner_->SetNotifyOnCancel(&closure_);
+  }
+
+ private:
+  static void CancelLocked(void* arg, grpc_error* error) {
+    auto* self = static_cast<QueuedPickCanceller*>(arg);
+    auto* chand = static_cast<ChannelData*>(self->elem_->channel_data);
+    auto* calld = static_cast<CallData*>(self->elem_->call_data);
+    MutexLock lock(chand->data_plane_mu());
+    if (GRPC_TRACE_FLAG_ENABLED(grpc_client_channel_routing_trace)) {
+      gpr_log(GPR_INFO,
+              "chand=%p calld=%p: cancelling queued pick: "
+              "error=%s self=%p calld->pick_canceller=%p",
+              chand, calld, grpc_error_string(error), self,
+              calld->pick_canceller_);
+    }
+    if (calld->pick_canceller_ == self && error != GRPC_ERROR_NONE) {
+      // Remove pick from list of queued picks.
+      calld->RemoveCallFromQueuedPicksLocked(self->elem_);
+      // Fail pending batches on the call.
+      calld->PendingBatchesFail(self->elem_, GRPC_ERROR_REF(error),
+                                YieldCallCombinerIfPendingBatchesFound);
+    }
+    GRPC_CALL_STACK_UNREF(calld->owning_call_, "QueuedPickCanceller");
+    Delete(self);
+  }
+
+  grpc_call_element* elem_;
+  grpc_closure closure_;
+};
+
+void CallData::RemoveCallFromQueuedPicksLocked(grpc_call_element* elem) {
+  auto* chand = static_cast<ChannelData*>(elem->channel_data);
+  if (GRPC_TRACE_FLAG_ENABLED(grpc_client_channel_routing_trace)) {
+    gpr_log(GPR_INFO, "chand=%p calld=%p: removing from queued picks list",
+            chand, this);
+  }
+  chand->RemoveQueuedPick(&pick_, pollent_);
+  pick_queued_ = false;
+  // Lame the call combiner canceller.
+  pick_canceller_ = nullptr;
+}
+
+void CallData::AddCallToQueuedPicksLocked(grpc_call_element* elem) {
+  auto* chand = static_cast<ChannelData*>(elem->channel_data);
+  if (GRPC_TRACE_FLAG_ENABLED(grpc_client_channel_routing_trace)) {
+    gpr_log(GPR_INFO, "chand=%p calld=%p: adding to queued picks list", chand,
+            this);
+  }
+  pick_queued_ = true;
+  pick_.elem = elem;
+  chand->AddQueuedPick(&pick_, pollent_);
+  // Register call combiner cancellation callback.
+  pick_canceller_ = New<QueuedPickCanceller>(elem);
+}
+
+void CallData::ApplyServiceConfigToCallLocked(grpc_call_element* elem) {
+  ChannelData* chand = static_cast<ChannelData*>(elem->channel_data);
+  if (GRPC_TRACE_FLAG_ENABLED(grpc_client_channel_routing_trace)) {
+    gpr_log(GPR_INFO, "chand=%p calld=%p: applying service config to call",
+            chand, this);
+  }
+  // Store a ref to the service_config in service_config_call_data_. Also, save
+  // a pointer to this in the call_context so that all future filters can access
+  // it.
+  service_config_call_data_ =
+      ServiceConfig::CallData(chand->service_config(), path_);
+  if (service_config_call_data_.service_config() != nullptr) {
+    call_context_[GRPC_CONTEXT_SERVICE_CONFIG_CALL_DATA].value =
+        &service_config_call_data_;
+    method_params_ = static_cast<ClientChannelMethodParsedConfig*>(
+        service_config_call_data_.GetMethodParsedConfig(
+            internal::ClientChannelServiceConfigParser::ParserIndex()));
+  }
+  retry_throttle_data_ = chand->retry_throttle_data();
+  if (method_params_ != nullptr) {
+    // If the deadline from the service config is shorter than the one
+    // from the client API, reset the deadline timer.
+    if (chand->deadline_checking_enabled() && method_params_->timeout() != 0) {
+      const grpc_millis per_method_deadline =
+          grpc_cycle_counter_to_millis_round_up(call_start_time_) +
+          method_params_->timeout();
+      if (per_method_deadline < deadline_) {
+        deadline_ = per_method_deadline;
+        grpc_deadline_state_reset(elem, deadline_);
+      }
+    }
+    // If the service config set wait_for_ready and the application
+    // did not explicitly set it, use the value from the service config.
+    uint32_t* send_initial_metadata_flags =
+        &pending_batches_[0]
+             .batch->payload->send_initial_metadata.send_initial_metadata_flags;
+    if (method_params_->wait_for_ready().has_value() &&
+        !(*send_initial_metadata_flags &
+          GRPC_INITIAL_METADATA_WAIT_FOR_READY_EXPLICITLY_SET)) {
+      if (method_params_->wait_for_ready().value()) {
+        *send_initial_metadata_flags |= GRPC_INITIAL_METADATA_WAIT_FOR_READY;
+      } else {
+        *send_initial_metadata_flags &= ~GRPC_INITIAL_METADATA_WAIT_FOR_READY;
+      }
+    }
+  }
+  // If no retry policy, disable retries.
+  // TODO(roth): Remove this when adding support for transparent retries.
+  if (method_params_ == nullptr || method_params_->retry_policy() == nullptr) {
+    enable_retries_ = false;
+  }
+}
+
+void CallData::MaybeApplyServiceConfigToCallLocked(grpc_call_element* elem) {
+  ChannelData* chand = static_cast<ChannelData*>(elem->channel_data);
+  // Apply service config data to the call only once, and only if the
+  // channel has the data available.
+  if (GPR_LIKELY(chand->received_service_config_data() &&
+                 !service_config_applied_)) {
+    service_config_applied_ = true;
+    ApplyServiceConfigToCallLocked(elem);
+  }
+}
+
+const char* PickResultTypeName(
+    LoadBalancingPolicy::PickResult::ResultType type) {
+  switch (type) {
+    case LoadBalancingPolicy::PickResult::PICK_COMPLETE:
+      return "COMPLETE";
+    case LoadBalancingPolicy::PickResult::PICK_QUEUE:
+      return "QUEUE";
+    case LoadBalancingPolicy::PickResult::PICK_FAILED:
+      return "FAILED";
+  }
+  GPR_UNREACHABLE_CODE(return "UNKNOWN");
+}
+
+void CallData::PickSubchannel(void* arg, grpc_error* error) {
+  grpc_call_element* elem = static_cast<grpc_call_element*>(arg);
+  CallData* calld = static_cast<CallData*>(elem->call_data);
+  ChannelData* chand = static_cast<ChannelData*>(elem->channel_data);
+  bool pick_complete;
+  {
+    MutexLock lock(chand->data_plane_mu());
+    pick_complete = calld->PickSubchannelLocked(elem, &error);
+  }
+  if (pick_complete) {
+    PickDone(elem, error);
+    GRPC_ERROR_UNREF(error);
+  }
+}
+
+bool CallData::PickSubchannelLocked(grpc_call_element* elem,
+                                    grpc_error** error) {
+  ChannelData* chand = static_cast<ChannelData*>(elem->channel_data);
+  GPR_ASSERT(connected_subchannel_ == nullptr);
+  GPR_ASSERT(subchannel_call_ == nullptr);
+  // The picker being null means that the channel is currently in IDLE state.
+  // The incoming call will make the channel exit IDLE.
+  if (chand->picker() == nullptr) {
+    // Bounce into the control plane combiner to exit IDLE.
+    chand->CheckConnectivityState(/*try_to_connect=*/true);
+    // Queue the pick, so that it will be attempted once the channel
+    // becomes connected.
+    AddCallToQueuedPicksLocked(elem);
+    return false;
+  }
+  // Apply service config to call if needed.
+  MaybeApplyServiceConfigToCallLocked(elem);
+  // If this is a retry, use the send_initial_metadata payload that
+  // we've cached; otherwise, use the pending batch.  The
+  // send_initial_metadata batch will be the first pending batch in the
+  // list, as set by GetBatchIndex() above.
+  // TODO(roth): What if the LB policy needs to add something to the
+  // call's initial metadata, and then there's a retry?  We don't want
+  // the new metadata to be added twice.  We might need to somehow
+  // allocate the subchannel batch earlier so that we can give the
+  // subchannel's copy of the metadata batch (which is copied for each
+  // attempt) to the LB policy instead the one from the parent channel.
+  LoadBalancingPolicy::PickArgs pick_args;
+  pick_args.call_state = &lb_call_state_;
+  Metadata initial_metadata(
+      this,
+      seen_send_initial_metadata_
+          ? &send_initial_metadata_
+          : pending_batches_[0]
+                .batch->payload->send_initial_metadata.send_initial_metadata);
+  pick_args.initial_metadata = &initial_metadata;
+  // Grab initial metadata flags so that we can check later if the call has
+  // wait_for_ready enabled.
+  const uint32_t send_initial_metadata_flags =
+      seen_send_initial_metadata_ ? send_initial_metadata_flags_
+                                  : pending_batches_[0]
+                                        .batch->payload->send_initial_metadata
+                                        .send_initial_metadata_flags;
+  // Attempt pick.
+  auto result = chand->picker()->Pick(pick_args);
+  if (GRPC_TRACE_FLAG_ENABLED(grpc_client_channel_routing_trace)) {
+    gpr_log(GPR_INFO,
+            "chand=%p calld=%p: LB pick returned %s (subchannel=%p, error=%s)",
+            chand, this, PickResultTypeName(result.type),
+            result.subchannel.get(), grpc_error_string(result.error));
+  }
+  switch (result.type) {
+    case LoadBalancingPolicy::PickResult::PICK_FAILED: {
+      // If we're shutting down, fail all RPCs.
+      grpc_error* disconnect_error = chand->disconnect_error();
+      if (disconnect_error != GRPC_ERROR_NONE) {
+        GRPC_ERROR_UNREF(result.error);
+        if (pick_queued_) RemoveCallFromQueuedPicksLocked(elem);
+        *error = GRPC_ERROR_REF(disconnect_error);
+        return true;
+      }
+      // If wait_for_ready is false, then the error indicates the RPC
+      // attempt's final status.
+      if ((send_initial_metadata_flags &
+           GRPC_INITIAL_METADATA_WAIT_FOR_READY) == 0) {
+        // Retry if appropriate; otherwise, fail.
+        grpc_status_code status = GRPC_STATUS_OK;
+        grpc_error_get_status(result.error, deadline_, &status, nullptr,
+                              nullptr, nullptr);
+        const bool retried = enable_retries_ &&
+                             MaybeRetry(elem, nullptr /* batch_data */, status,
+                                        nullptr /* server_pushback_md */);
+        if (!retried) {
+          grpc_error* new_error =
+              GRPC_ERROR_CREATE_REFERENCING_FROM_STATIC_STRING(
+                  "Failed to pick subchannel", &result.error, 1);
+          GRPC_ERROR_UNREF(result.error);
+          *error = new_error;
+        }
+        if (pick_queued_) RemoveCallFromQueuedPicksLocked(elem);
+        return !retried;
+      }
+      // If wait_for_ready is true, then queue to retry when we get a new
+      // picker.
+      GRPC_ERROR_UNREF(result.error);
+    }
+    // Fallthrough
+    case LoadBalancingPolicy::PickResult::PICK_QUEUE:
+      if (!pick_queued_) AddCallToQueuedPicksLocked(elem);
+      return false;
+    default:  // PICK_COMPLETE
+      if (pick_queued_) RemoveCallFromQueuedPicksLocked(elem);
+      // Handle drops.
+      if (GPR_UNLIKELY(result.subchannel == nullptr)) {
+        result.error = GRPC_ERROR_CREATE_FROM_STATIC_STRING(
+            "Call dropped by load balancing policy");
+      } else {
+        // Grab a ref to the connected subchannel while we're still
+        // holding the data plane mutex.
+        connected_subchannel_ =
+            chand->GetConnectedSubchannelInDataPlane(result.subchannel.get());
+        GPR_ASSERT(connected_subchannel_ != nullptr);
+      }
+      lb_recv_trailing_metadata_ready_ = result.recv_trailing_metadata_ready;
+      lb_recv_trailing_metadata_ready_user_data_ =
+          result.recv_trailing_metadata_ready_user_data;
+      *error = result.error;
+      return true;
+  }
+}
+
+}  // namespace
+}  // namespace grpc_core
+
+/*************************************************************************
+ * EXPORTED SYMBOLS
+ */
+
+using grpc_core::CallData;
+using grpc_core::ChannelData;
+
+const grpc_channel_filter grpc_client_channel_filter = {
+    CallData::StartTransportStreamOpBatch,
+    ChannelData::StartTransportOp,
+    sizeof(CallData),
+    CallData::Init,
+    CallData::SetPollent,
+    CallData::Destroy,
+    sizeof(ChannelData),
+    ChannelData::Init,
+    ChannelData::Destroy,
+    ChannelData::GetChannelInfo,
+    "client-channel",
+};
+
+grpc_connectivity_state grpc_client_channel_check_connectivity_state(
+    grpc_channel_element* elem, int try_to_connect) {
+  auto* chand = static_cast<ChannelData*>(elem->channel_data);
+  return chand->CheckConnectivityState(try_to_connect);
+}
+
+int grpc_client_channel_num_external_connectivity_watchers(
+    grpc_channel_element* elem) {
+  auto* chand = static_cast<ChannelData*>(elem->channel_data);
+  return chand->NumExternalConnectivityWatchers();
+}
+
+void grpc_client_channel_watch_connectivity_state(
+    grpc_channel_element* elem, grpc_polling_entity pollent,
+    grpc_connectivity_state* state, grpc_closure* closure,
+    grpc_closure* watcher_timer_init) {
+  auto* chand = static_cast<ChannelData*>(elem->channel_data);
+  return chand->AddExternalConnectivityWatcher(pollent, state, closure,
+                                               watcher_timer_init);
+}
+
+grpc_core::RefCountedPtr<grpc_core::SubchannelCall>
+grpc_client_channel_get_subchannel_call(grpc_call_element* elem) {
+  auto* calld = static_cast<CallData*>(elem->call_data);
+  return calld->subchannel_call();
+}