Built motion from commit 6a09e18b.|2.6.11
[motion2.git] / legacy-libs / grpc-cloned / deps / grpc / src / core / ext / filters / client_channel / subchannel.cc
diff --git a/legacy-libs/grpc-cloned/deps/grpc/src/core/ext/filters/client_channel/subchannel.cc b/legacy-libs/grpc-cloned/deps/grpc/src/core/ext/filters/client_channel/subchannel.cc
new file mode 100644 (file)
index 0000000..e30d915
--- /dev/null
@@ -0,0 +1,1128 @@
+/*
+ *
+ * Copyright 2015 gRPC authors.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ *
+ */
+
+#include <grpc/support/port_platform.h>
+
+#include "src/core/ext/filters/client_channel/subchannel.h"
+
+#include <inttypes.h>
+#include <limits.h>
+
+#include <algorithm>
+#include <cstring>
+
+#include <grpc/support/alloc.h>
+#include <grpc/support/string_util.h>
+
+#include "src/core/ext/filters/client_channel/client_channel.h"
+#include "src/core/ext/filters/client_channel/health/health_check_client.h"
+#include "src/core/ext/filters/client_channel/parse_address.h"
+#include "src/core/ext/filters/client_channel/proxy_mapper_registry.h"
+#include "src/core/ext/filters/client_channel/service_config.h"
+#include "src/core/ext/filters/client_channel/subchannel_pool_interface.h"
+#include "src/core/lib/backoff/backoff.h"
+#include "src/core/lib/channel/channel_args.h"
+#include "src/core/lib/channel/connected_channel.h"
+#include "src/core/lib/debug/stats.h"
+#include "src/core/lib/gpr/alloc.h"
+#include "src/core/lib/gprpp/debug_location.h"
+#include "src/core/lib/gprpp/manual_constructor.h"
+#include "src/core/lib/gprpp/ref_counted_ptr.h"
+#include "src/core/lib/gprpp/sync.h"
+#include "src/core/lib/iomgr/sockaddr_utils.h"
+#include "src/core/lib/profiling/timers.h"
+#include "src/core/lib/slice/slice_internal.h"
+#include "src/core/lib/surface/channel.h"
+#include "src/core/lib/surface/channel_init.h"
+#include "src/core/lib/transport/connectivity_state.h"
+#include "src/core/lib/transport/error_utils.h"
+#include "src/core/lib/transport/status_metadata.h"
+#include "src/core/lib/uri/uri_parser.h"
+
+// Strong and weak refs.
+#define INTERNAL_REF_BITS 16
+#define STRONG_REF_MASK (~(gpr_atm)((1 << INTERNAL_REF_BITS) - 1))
+
+// Backoff parameters.
+#define GRPC_SUBCHANNEL_INITIAL_CONNECT_BACKOFF_SECONDS 1
+#define GRPC_SUBCHANNEL_RECONNECT_BACKOFF_MULTIPLIER 1.6
+#define GRPC_SUBCHANNEL_RECONNECT_MIN_TIMEOUT_SECONDS 20
+#define GRPC_SUBCHANNEL_RECONNECT_MAX_BACKOFF_SECONDS 120
+#define GRPC_SUBCHANNEL_RECONNECT_JITTER 0.2
+
+// Conversion between subchannel call and call stack.
+#define SUBCHANNEL_CALL_TO_CALL_STACK(call) \
+  (grpc_call_stack*)((char*)(call) +        \
+                     GPR_ROUND_UP_TO_ALIGNMENT_SIZE(sizeof(SubchannelCall)))
+#define CALL_STACK_TO_SUBCHANNEL_CALL(callstack) \
+  (SubchannelCall*)(((char*)(call_stack)) -      \
+                    GPR_ROUND_UP_TO_ALIGNMENT_SIZE(sizeof(SubchannelCall)))
+
+namespace grpc_core {
+
+TraceFlag grpc_trace_subchannel(false, "subchannel");
+DebugOnlyTraceFlag grpc_trace_subchannel_refcount(false, "subchannel_refcount");
+
+//
+// ConnectedSubchannel
+//
+
+ConnectedSubchannel::ConnectedSubchannel(
+    grpc_channel_stack* channel_stack, const grpc_channel_args* args,
+    RefCountedPtr<channelz::SubchannelNode> channelz_subchannel)
+    : RefCounted<ConnectedSubchannel>(&grpc_trace_subchannel_refcount),
+      channel_stack_(channel_stack),
+      args_(grpc_channel_args_copy(args)),
+      channelz_subchannel_(std::move(channelz_subchannel)) {}
+
+ConnectedSubchannel::~ConnectedSubchannel() {
+  grpc_channel_args_destroy(args_);
+  GRPC_CHANNEL_STACK_UNREF(channel_stack_, "connected_subchannel_dtor");
+}
+
+void ConnectedSubchannel::NotifyOnStateChange(
+    grpc_pollset_set* interested_parties, grpc_connectivity_state* state,
+    grpc_closure* closure) {
+  grpc_transport_op* op = grpc_make_transport_op(nullptr);
+  grpc_channel_element* elem;
+  op->connectivity_state = state;
+  op->on_connectivity_state_change = closure;
+  op->bind_pollset_set = interested_parties;
+  elem = grpc_channel_stack_element(channel_stack_, 0);
+  elem->filter->start_transport_op(elem, op);
+}
+
+void ConnectedSubchannel::Ping(grpc_closure* on_initiate,
+                               grpc_closure* on_ack) {
+  grpc_transport_op* op = grpc_make_transport_op(nullptr);
+  grpc_channel_element* elem;
+  op->send_ping.on_initiate = on_initiate;
+  op->send_ping.on_ack = on_ack;
+  elem = grpc_channel_stack_element(channel_stack_, 0);
+  elem->filter->start_transport_op(elem, op);
+}
+
+size_t ConnectedSubchannel::GetInitialCallSizeEstimate(
+    size_t parent_data_size) const {
+  size_t allocation_size =
+      GPR_ROUND_UP_TO_ALIGNMENT_SIZE(sizeof(SubchannelCall));
+  if (parent_data_size > 0) {
+    allocation_size +=
+        GPR_ROUND_UP_TO_ALIGNMENT_SIZE(channel_stack_->call_stack_size) +
+        parent_data_size;
+  } else {
+    allocation_size += channel_stack_->call_stack_size;
+  }
+  return allocation_size;
+}
+
+//
+// SubchannelCall
+//
+
+RefCountedPtr<SubchannelCall> SubchannelCall::Create(Args args,
+                                                     grpc_error** error) {
+  const size_t allocation_size =
+      args.connected_subchannel->GetInitialCallSizeEstimate(
+          args.parent_data_size);
+  Arena* arena = args.arena;
+  return RefCountedPtr<SubchannelCall>(new (
+      arena->Alloc(allocation_size)) SubchannelCall(std::move(args), error));
+}
+
+SubchannelCall::SubchannelCall(Args args, grpc_error** error)
+    : connected_subchannel_(std::move(args.connected_subchannel)),
+      deadline_(args.deadline) {
+  grpc_call_stack* callstk = SUBCHANNEL_CALL_TO_CALL_STACK(this);
+  const grpc_call_element_args call_args = {
+      callstk,           /* call_stack */
+      nullptr,           /* server_transport_data */
+      args.context,      /* context */
+      args.path,         /* path */
+      args.start_time,   /* start_time */
+      args.deadline,     /* deadline */
+      args.arena,        /* arena */
+      args.call_combiner /* call_combiner */
+  };
+  *error = grpc_call_stack_init(connected_subchannel_->channel_stack(), 1,
+                                SubchannelCall::Destroy, this, &call_args);
+  if (GPR_UNLIKELY(*error != GRPC_ERROR_NONE)) {
+    const char* error_string = grpc_error_string(*error);
+    gpr_log(GPR_ERROR, "error: %s", error_string);
+    return;
+  }
+  grpc_call_stack_set_pollset_or_pollset_set(callstk, args.pollent);
+  auto* channelz_node = connected_subchannel_->channelz_subchannel();
+  if (channelz_node != nullptr) {
+    channelz_node->RecordCallStarted();
+  }
+}
+
+void SubchannelCall::StartTransportStreamOpBatch(
+    grpc_transport_stream_op_batch* batch) {
+  GPR_TIMER_SCOPE("subchannel_call_process_op", 0);
+  MaybeInterceptRecvTrailingMetadata(batch);
+  grpc_call_stack* call_stack = SUBCHANNEL_CALL_TO_CALL_STACK(this);
+  grpc_call_element* top_elem = grpc_call_stack_element(call_stack, 0);
+  GRPC_CALL_LOG_OP(GPR_INFO, top_elem, batch);
+  top_elem->filter->start_transport_stream_op_batch(top_elem, batch);
+}
+
+void* SubchannelCall::GetParentData() {
+  grpc_channel_stack* chanstk = connected_subchannel_->channel_stack();
+  return (char*)this + GPR_ROUND_UP_TO_ALIGNMENT_SIZE(sizeof(SubchannelCall)) +
+         GPR_ROUND_UP_TO_ALIGNMENT_SIZE(chanstk->call_stack_size);
+}
+
+grpc_call_stack* SubchannelCall::GetCallStack() {
+  return SUBCHANNEL_CALL_TO_CALL_STACK(this);
+}
+
+void SubchannelCall::SetAfterCallStackDestroy(grpc_closure* closure) {
+  GPR_ASSERT(after_call_stack_destroy_ == nullptr);
+  GPR_ASSERT(closure != nullptr);
+  after_call_stack_destroy_ = closure;
+}
+
+RefCountedPtr<SubchannelCall> SubchannelCall::Ref() {
+  IncrementRefCount();
+  return RefCountedPtr<SubchannelCall>(this);
+}
+
+RefCountedPtr<SubchannelCall> SubchannelCall::Ref(
+    const grpc_core::DebugLocation& location, const char* reason) {
+  IncrementRefCount(location, reason);
+  return RefCountedPtr<SubchannelCall>(this);
+}
+
+void SubchannelCall::Unref() {
+  GRPC_CALL_STACK_UNREF(SUBCHANNEL_CALL_TO_CALL_STACK(this), "");
+}
+
+void SubchannelCall::Unref(const DebugLocation& location, const char* reason) {
+  GRPC_CALL_STACK_UNREF(SUBCHANNEL_CALL_TO_CALL_STACK(this), reason);
+}
+
+void SubchannelCall::Destroy(void* arg, grpc_error* error) {
+  GPR_TIMER_SCOPE("subchannel_call_destroy", 0);
+  SubchannelCall* self = static_cast<SubchannelCall*>(arg);
+  // Keep some members before destroying the subchannel call.
+  grpc_closure* after_call_stack_destroy = self->after_call_stack_destroy_;
+  RefCountedPtr<ConnectedSubchannel> connected_subchannel =
+      std::move(self->connected_subchannel_);
+  // Destroy the subchannel call.
+  self->~SubchannelCall();
+  // Destroy the call stack. This should be after destroying the subchannel
+  // call, because call->after_call_stack_destroy(), if not null, will free the
+  // call arena.
+  grpc_call_stack_destroy(SUBCHANNEL_CALL_TO_CALL_STACK(self), nullptr,
+                          after_call_stack_destroy);
+  // Automatically reset connected_subchannel. This should be after destroying
+  // the call stack, because destroying call stack needs access to the channel
+  // stack.
+}
+
+void SubchannelCall::MaybeInterceptRecvTrailingMetadata(
+    grpc_transport_stream_op_batch* batch) {
+  // only intercept payloads with recv trailing.
+  if (!batch->recv_trailing_metadata) {
+    return;
+  }
+  // only add interceptor is channelz is enabled.
+  if (connected_subchannel_->channelz_subchannel() == nullptr) {
+    return;
+  }
+  GRPC_CLOSURE_INIT(&recv_trailing_metadata_ready_, RecvTrailingMetadataReady,
+                    this, grpc_schedule_on_exec_ctx);
+  // save some state needed for the interception callback.
+  GPR_ASSERT(recv_trailing_metadata_ == nullptr);
+  recv_trailing_metadata_ =
+      batch->payload->recv_trailing_metadata.recv_trailing_metadata;
+  original_recv_trailing_metadata_ =
+      batch->payload->recv_trailing_metadata.recv_trailing_metadata_ready;
+  batch->payload->recv_trailing_metadata.recv_trailing_metadata_ready =
+      &recv_trailing_metadata_ready_;
+}
+
+namespace {
+
+// Sets *status based on the rest of the parameters.
+void GetCallStatus(grpc_status_code* status, grpc_millis deadline,
+                   grpc_metadata_batch* md_batch, grpc_error* error) {
+  if (error != GRPC_ERROR_NONE) {
+    grpc_error_get_status(error, deadline, status, nullptr, nullptr, nullptr);
+  } else {
+    if (md_batch->idx.named.grpc_status != nullptr) {
+      *status = grpc_get_status_code_from_metadata(
+          md_batch->idx.named.grpc_status->md);
+    } else {
+      *status = GRPC_STATUS_UNKNOWN;
+    }
+  }
+  GRPC_ERROR_UNREF(error);
+}
+
+}  // namespace
+
+void SubchannelCall::RecvTrailingMetadataReady(void* arg, grpc_error* error) {
+  SubchannelCall* call = static_cast<SubchannelCall*>(arg);
+  GPR_ASSERT(call->recv_trailing_metadata_ != nullptr);
+  grpc_status_code status = GRPC_STATUS_OK;
+  GetCallStatus(&status, call->deadline_, call->recv_trailing_metadata_,
+                GRPC_ERROR_REF(error));
+  channelz::SubchannelNode* channelz_subchannel =
+      call->connected_subchannel_->channelz_subchannel();
+  GPR_ASSERT(channelz_subchannel != nullptr);
+  if (status == GRPC_STATUS_OK) {
+    channelz_subchannel->RecordCallSucceeded();
+  } else {
+    channelz_subchannel->RecordCallFailed();
+  }
+  GRPC_CLOSURE_RUN(call->original_recv_trailing_metadata_,
+                   GRPC_ERROR_REF(error));
+}
+
+void SubchannelCall::IncrementRefCount() {
+  GRPC_CALL_STACK_REF(SUBCHANNEL_CALL_TO_CALL_STACK(this), "");
+}
+
+void SubchannelCall::IncrementRefCount(const grpc_core::DebugLocation& location,
+                                       const char* reason) {
+  GRPC_CALL_STACK_REF(SUBCHANNEL_CALL_TO_CALL_STACK(this), reason);
+}
+
+//
+// Subchannel::ConnectedSubchannelStateWatcher
+//
+
+class Subchannel::ConnectedSubchannelStateWatcher {
+ public:
+  // Must be instantiated while holding c->mu.
+  explicit ConnectedSubchannelStateWatcher(Subchannel* c) : subchannel_(c) {
+    // Steal subchannel ref for connecting.
+    GRPC_SUBCHANNEL_WEAK_REF(subchannel_, "state_watcher");
+    GRPC_SUBCHANNEL_WEAK_UNREF(subchannel_, "connecting");
+    // Start watching for connectivity state changes.
+    GRPC_CLOSURE_INIT(&on_connectivity_changed_, OnConnectivityChanged, this,
+                      grpc_schedule_on_exec_ctx);
+    c->connected_subchannel_->NotifyOnStateChange(c->pollset_set_,
+                                                  &pending_connectivity_state_,
+                                                  &on_connectivity_changed_);
+  }
+
+  ~ConnectedSubchannelStateWatcher() {
+    GRPC_SUBCHANNEL_WEAK_UNREF(subchannel_, "state_watcher");
+  }
+
+ private:
+  static void OnConnectivityChanged(void* arg, grpc_error* error) {
+    auto* self = static_cast<ConnectedSubchannelStateWatcher*>(arg);
+    Subchannel* c = self->subchannel_;
+    {
+      MutexLock lock(&c->mu_);
+      switch (self->pending_connectivity_state_) {
+        case GRPC_CHANNEL_TRANSIENT_FAILURE:
+        case GRPC_CHANNEL_SHUTDOWN: {
+          if (!c->disconnected_ && c->connected_subchannel_ != nullptr) {
+            if (grpc_trace_subchannel.enabled()) {
+              gpr_log(GPR_INFO,
+                      "Connected subchannel %p of subchannel %p has gone into "
+                      "%s. Attempting to reconnect.",
+                      c->connected_subchannel_.get(), c,
+                      grpc_connectivity_state_name(
+                          self->pending_connectivity_state_));
+            }
+            c->connected_subchannel_.reset();
+            if (c->channelz_node() != nullptr) {
+              c->channelz_node()->SetChildSocket(nullptr);
+            }
+            c->SetConnectivityStateLocked(GRPC_CHANNEL_TRANSIENT_FAILURE);
+            c->backoff_begun_ = false;
+            c->backoff_.Reset();
+          }
+          break;
+        }
+        default: {
+          // In principle, this should never happen.  We should not get
+          // a callback for READY, because that was the state we started
+          // this watch from.  And a connected subchannel should never go
+          // from READY to CONNECTING or IDLE.
+          c->SetConnectivityStateLocked(self->pending_connectivity_state_);
+          c->connected_subchannel_->NotifyOnStateChange(
+              nullptr, &self->pending_connectivity_state_,
+              &self->on_connectivity_changed_);
+          return;  // So we don't delete ourself below.
+        }
+      }
+    }
+    // Don't delete until we've released the lock, because this might
+    // cause the subchannel (which contains the lock) to be destroyed.
+    Delete(self);
+  }
+
+  Subchannel* subchannel_;
+  grpc_closure on_connectivity_changed_;
+  grpc_connectivity_state pending_connectivity_state_ = GRPC_CHANNEL_READY;
+};
+
+//
+// Subchannel::ConnectivityStateWatcherList
+//
+
+void Subchannel::ConnectivityStateWatcherList::AddWatcherLocked(
+    OrphanablePtr<ConnectivityStateWatcherInterface> watcher) {
+  watchers_.insert(MakePair(watcher.get(), std::move(watcher)));
+}
+
+void Subchannel::ConnectivityStateWatcherList::RemoveWatcherLocked(
+    ConnectivityStateWatcherInterface* watcher) {
+  watchers_.erase(watcher);
+}
+
+void Subchannel::ConnectivityStateWatcherList::NotifyLocked(
+    Subchannel* subchannel, grpc_connectivity_state state) {
+  for (const auto& p : watchers_) {
+    RefCountedPtr<ConnectedSubchannel> connected_subchannel;
+    if (state == GRPC_CHANNEL_READY) {
+      connected_subchannel = subchannel->connected_subchannel_;
+    }
+    // TODO(roth): In principle, it seems wrong to send this notification
+    // to the watcher while holding the subchannel's mutex, since it could
+    // lead to a deadlock if the watcher calls back into the subchannel
+    // before returning back to us.  In practice, this doesn't happen,
+    // because the LB policy code that watches subchannels always bounces
+    // the notification into the client_channel control-plane combiner
+    // before processing it.  But if we ever have any other callers here,
+    // we will probably need to change this.
+    p.second->OnConnectivityStateChange(state, std::move(connected_subchannel));
+  }
+}
+
+//
+// Subchannel::HealthWatcherMap::HealthWatcher
+//
+
+// State needed for tracking the connectivity state with a particular
+// health check service name.
+class Subchannel::HealthWatcherMap::HealthWatcher
+    : public InternallyRefCounted<HealthWatcher> {
+ public:
+  HealthWatcher(Subchannel* c, UniquePtr<char> health_check_service_name,
+                grpc_connectivity_state subchannel_state)
+      : subchannel_(c),
+        health_check_service_name_(std::move(health_check_service_name)),
+        state_(subchannel_state == GRPC_CHANNEL_READY ? GRPC_CHANNEL_CONNECTING
+                                                      : subchannel_state) {
+    GRPC_SUBCHANNEL_WEAK_REF(subchannel_, "health_watcher");
+    GRPC_CLOSURE_INIT(&on_health_changed_, OnHealthChanged, this,
+                      grpc_schedule_on_exec_ctx);
+    // If the subchannel is already connected, start health checking.
+    if (subchannel_state == GRPC_CHANNEL_READY) StartHealthCheckingLocked();
+  }
+
+  ~HealthWatcher() {
+    GRPC_SUBCHANNEL_WEAK_UNREF(subchannel_, "health_watcher");
+  }
+
+  const char* health_check_service_name() const {
+    return health_check_service_name_.get();
+  }
+
+  grpc_connectivity_state state() const { return state_; }
+
+  void AddWatcherLocked(
+      grpc_connectivity_state initial_state,
+      OrphanablePtr<ConnectivityStateWatcherInterface> watcher) {
+    if (state_ != initial_state) {
+      RefCountedPtr<ConnectedSubchannel> connected_subchannel;
+      if (state_ == GRPC_CHANNEL_READY) {
+        connected_subchannel = subchannel_->connected_subchannel_;
+      }
+      watcher->OnConnectivityStateChange(state_,
+                                         std::move(connected_subchannel));
+    }
+    watcher_list_.AddWatcherLocked(std::move(watcher));
+  }
+
+  void RemoveWatcherLocked(ConnectivityStateWatcherInterface* watcher) {
+    watcher_list_.RemoveWatcherLocked(watcher);
+  }
+
+  bool HasWatchers() const { return !watcher_list_.empty(); }
+
+  void NotifyLocked(grpc_connectivity_state state) {
+    if (state == GRPC_CHANNEL_READY) {
+      // If we had not already notified for CONNECTING state, do so now.
+      // (We may have missed this earlier, because if the transition
+      // from IDLE to CONNECTING to READY was too quick, the connected
+      // subchannel may not have sent us a notification for CONNECTING.)
+      if (state_ != GRPC_CHANNEL_CONNECTING) {
+        state_ = GRPC_CHANNEL_CONNECTING;
+        watcher_list_.NotifyLocked(subchannel_, state_);
+      }
+      // If we've become connected, start health checking.
+      StartHealthCheckingLocked();
+    } else {
+      state_ = state;
+      watcher_list_.NotifyLocked(subchannel_, state_);
+      // We're not connected, so stop health checking.
+      health_check_client_.reset();
+    }
+  }
+
+  void Orphan() override {
+    watcher_list_.Clear();
+    health_check_client_.reset();
+    Unref();
+  }
+
+ private:
+  void StartHealthCheckingLocked() {
+    GPR_ASSERT(health_check_client_ == nullptr);
+    health_check_client_ = MakeOrphanable<HealthCheckClient>(
+        health_check_service_name_.get(), subchannel_->connected_subchannel_,
+        subchannel_->pollset_set_, subchannel_->channelz_node_);
+    Ref().release();  // Ref for health callback tracked manually.
+    health_check_client_->NotifyOnHealthChange(&state_, &on_health_changed_);
+  }
+
+  static void OnHealthChanged(void* arg, grpc_error* error) {
+    auto* self = static_cast<HealthWatcher*>(arg);
+    Subchannel* c = self->subchannel_;
+    {
+      MutexLock lock(&c->mu_);
+      if (self->state_ != GRPC_CHANNEL_SHUTDOWN &&
+          self->health_check_client_ != nullptr) {
+        self->watcher_list_.NotifyLocked(c, self->state_);
+        // Renew watch.
+        self->health_check_client_->NotifyOnHealthChange(
+            &self->state_, &self->on_health_changed_);
+        return;  // So we don't unref below.
+      }
+    }
+    // Don't unref until we've released the lock, because this might
+    // cause the subchannel (which contains the lock) to be destroyed.
+    self->Unref();
+  }
+
+  Subchannel* subchannel_;
+  UniquePtr<char> health_check_service_name_;
+  OrphanablePtr<HealthCheckClient> health_check_client_;
+  grpc_closure on_health_changed_;
+  grpc_connectivity_state state_;
+  ConnectivityStateWatcherList watcher_list_;
+};
+
+//
+// Subchannel::HealthWatcherMap
+//
+
+void Subchannel::HealthWatcherMap::AddWatcherLocked(
+    Subchannel* subchannel, grpc_connectivity_state initial_state,
+    UniquePtr<char> health_check_service_name,
+    OrphanablePtr<ConnectivityStateWatcherInterface> watcher) {
+  // If the health check service name is not already present in the map,
+  // add it.
+  auto it = map_.find(health_check_service_name.get());
+  HealthWatcher* health_watcher;
+  if (it == map_.end()) {
+    const char* key = health_check_service_name.get();
+    auto w = MakeOrphanable<HealthWatcher>(
+        subchannel, std::move(health_check_service_name), subchannel->state_);
+    health_watcher = w.get();
+    map_[key] = std::move(w);
+  } else {
+    health_watcher = it->second.get();
+  }
+  // Add the watcher to the entry.
+  health_watcher->AddWatcherLocked(initial_state, std::move(watcher));
+}
+
+void Subchannel::HealthWatcherMap::RemoveWatcherLocked(
+    const char* health_check_service_name,
+    ConnectivityStateWatcherInterface* watcher) {
+  auto it = map_.find(health_check_service_name);
+  GPR_ASSERT(it != map_.end());
+  it->second->RemoveWatcherLocked(watcher);
+  // If we just removed the last watcher for this service name, remove
+  // the map entry.
+  if (!it->second->HasWatchers()) map_.erase(it);
+}
+
+void Subchannel::HealthWatcherMap::NotifyLocked(grpc_connectivity_state state) {
+  for (const auto& p : map_) {
+    p.second->NotifyLocked(state);
+  }
+}
+
+grpc_connectivity_state
+Subchannel::HealthWatcherMap::CheckConnectivityStateLocked(
+    Subchannel* subchannel, const char* health_check_service_name) {
+  auto it = map_.find(health_check_service_name);
+  if (it == map_.end()) {
+    // If the health check service name is not found in the map, we're
+    // not currently doing a health check for that service name.  If the
+    // subchannel's state without health checking is READY, report
+    // CONNECTING, since that's what we'd be in as soon as we do start a
+    // watch.  Otherwise, report the channel's state without health checking.
+    return subchannel->state_ == GRPC_CHANNEL_READY ? GRPC_CHANNEL_CONNECTING
+                                                    : subchannel->state_;
+  }
+  HealthWatcher* health_watcher = it->second.get();
+  return health_watcher->state();
+}
+
+void Subchannel::HealthWatcherMap::ShutdownLocked() { map_.clear(); }
+
+//
+// Subchannel
+//
+
+namespace {
+
+BackOff::Options ParseArgsForBackoffValues(
+    const grpc_channel_args* args, grpc_millis* min_connect_timeout_ms) {
+  grpc_millis initial_backoff_ms =
+      GRPC_SUBCHANNEL_INITIAL_CONNECT_BACKOFF_SECONDS * 1000;
+  *min_connect_timeout_ms =
+      GRPC_SUBCHANNEL_RECONNECT_MIN_TIMEOUT_SECONDS * 1000;
+  grpc_millis max_backoff_ms =
+      GRPC_SUBCHANNEL_RECONNECT_MAX_BACKOFF_SECONDS * 1000;
+  bool fixed_reconnect_backoff = false;
+  if (args != nullptr) {
+    for (size_t i = 0; i < args->num_args; i++) {
+      if (0 == strcmp(args->args[i].key,
+                      "grpc.testing.fixed_reconnect_backoff_ms")) {
+        fixed_reconnect_backoff = true;
+        initial_backoff_ms = *min_connect_timeout_ms = max_backoff_ms =
+            grpc_channel_arg_get_integer(
+                &args->args[i],
+                {static_cast<int>(initial_backoff_ms), 100, INT_MAX});
+      } else if (0 ==
+                 strcmp(args->args[i].key, GRPC_ARG_MIN_RECONNECT_BACKOFF_MS)) {
+        fixed_reconnect_backoff = false;
+        *min_connect_timeout_ms = grpc_channel_arg_get_integer(
+            &args->args[i],
+            {static_cast<int>(*min_connect_timeout_ms), 100, INT_MAX});
+      } else if (0 ==
+                 strcmp(args->args[i].key, GRPC_ARG_MAX_RECONNECT_BACKOFF_MS)) {
+        fixed_reconnect_backoff = false;
+        max_backoff_ms = grpc_channel_arg_get_integer(
+            &args->args[i], {static_cast<int>(max_backoff_ms), 100, INT_MAX});
+      } else if (0 == strcmp(args->args[i].key,
+                             GRPC_ARG_INITIAL_RECONNECT_BACKOFF_MS)) {
+        fixed_reconnect_backoff = false;
+        initial_backoff_ms = grpc_channel_arg_get_integer(
+            &args->args[i],
+            {static_cast<int>(initial_backoff_ms), 100, INT_MAX});
+      }
+    }
+  }
+  return BackOff::Options()
+      .set_initial_backoff(initial_backoff_ms)
+      .set_multiplier(fixed_reconnect_backoff
+                          ? 1.0
+                          : GRPC_SUBCHANNEL_RECONNECT_BACKOFF_MULTIPLIER)
+      .set_jitter(fixed_reconnect_backoff ? 0.0
+                                          : GRPC_SUBCHANNEL_RECONNECT_JITTER)
+      .set_max_backoff(max_backoff_ms);
+}
+
+}  // namespace
+
+Subchannel::Subchannel(SubchannelKey* key, grpc_connector* connector,
+                       const grpc_channel_args* args)
+    : key_(key),
+      connector_(connector),
+      backoff_(ParseArgsForBackoffValues(args, &min_connect_timeout_ms_)) {
+  GRPC_STATS_INC_CLIENT_SUBCHANNELS_CREATED();
+  gpr_atm_no_barrier_store(&ref_pair_, 1 << INTERNAL_REF_BITS);
+  grpc_connector_ref(connector_);
+  pollset_set_ = grpc_pollset_set_create();
+  grpc_resolved_address* addr =
+      static_cast<grpc_resolved_address*>(gpr_malloc(sizeof(*addr)));
+  GetAddressFromSubchannelAddressArg(args, addr);
+  grpc_resolved_address* new_address = nullptr;
+  grpc_channel_args* new_args = nullptr;
+  if (grpc_proxy_mappers_map_address(addr, args, &new_address, &new_args)) {
+    GPR_ASSERT(new_address != nullptr);
+    gpr_free(addr);
+    addr = new_address;
+  }
+  static const char* keys_to_remove[] = {GRPC_ARG_SUBCHANNEL_ADDRESS};
+  grpc_arg new_arg = CreateSubchannelAddressArg(addr);
+  gpr_free(addr);
+  args_ = grpc_channel_args_copy_and_add_and_remove(
+      new_args != nullptr ? new_args : args, keys_to_remove,
+      GPR_ARRAY_SIZE(keys_to_remove), &new_arg, 1);
+  gpr_free(new_arg.value.string);
+  if (new_args != nullptr) grpc_channel_args_destroy(new_args);
+  GRPC_CLOSURE_INIT(&on_connecting_finished_, OnConnectingFinished, this,
+                    grpc_schedule_on_exec_ctx);
+  const grpc_arg* arg = grpc_channel_args_find(args_, GRPC_ARG_ENABLE_CHANNELZ);
+  const bool channelz_enabled =
+      grpc_channel_arg_get_bool(arg, GRPC_ENABLE_CHANNELZ_DEFAULT);
+  arg = grpc_channel_args_find(
+      args_, GRPC_ARG_MAX_CHANNEL_TRACE_EVENT_MEMORY_PER_NODE);
+  const grpc_integer_options options = {
+      GRPC_MAX_CHANNEL_TRACE_EVENT_MEMORY_PER_NODE_DEFAULT, 0, INT_MAX};
+  size_t channel_tracer_max_memory =
+      (size_t)grpc_channel_arg_get_integer(arg, options);
+  if (channelz_enabled) {
+    channelz_node_ = MakeRefCounted<channelz::SubchannelNode>(
+        GetTargetAddress(), channel_tracer_max_memory);
+    channelz_node_->AddTraceEvent(
+        channelz::ChannelTrace::Severity::Info,
+        grpc_slice_from_static_string("subchannel created"));
+  }
+}
+
+Subchannel::~Subchannel() {
+  if (channelz_node_ != nullptr) {
+    channelz_node_->AddTraceEvent(
+        channelz::ChannelTrace::Severity::Info,
+        grpc_slice_from_static_string("Subchannel destroyed"));
+    channelz_node_->UpdateConnectivityState(GRPC_CHANNEL_SHUTDOWN);
+  }
+  grpc_channel_args_destroy(args_);
+  grpc_connector_unref(connector_);
+  grpc_pollset_set_destroy(pollset_set_);
+  Delete(key_);
+}
+
+Subchannel* Subchannel::Create(grpc_connector* connector,
+                               const grpc_channel_args* args) {
+  SubchannelKey* key = New<SubchannelKey>(args);
+  SubchannelPoolInterface* subchannel_pool =
+      SubchannelPoolInterface::GetSubchannelPoolFromChannelArgs(args);
+  GPR_ASSERT(subchannel_pool != nullptr);
+  Subchannel* c = subchannel_pool->FindSubchannel(key);
+  if (c != nullptr) {
+    Delete(key);
+    return c;
+  }
+  c = New<Subchannel>(key, connector, args);
+  // Try to register the subchannel before setting the subchannel pool.
+  // Otherwise, in case of a registration race, unreffing c in
+  // RegisterSubchannel() will cause c to be tried to be unregistered, while
+  // its key maps to a different subchannel.
+  Subchannel* registered = subchannel_pool->RegisterSubchannel(key, c);
+  if (registered == c) c->subchannel_pool_ = subchannel_pool->Ref();
+  return registered;
+}
+
+Subchannel* Subchannel::Ref(GRPC_SUBCHANNEL_REF_EXTRA_ARGS) {
+  gpr_atm old_refs;
+  old_refs = RefMutate((1 << INTERNAL_REF_BITS),
+                       0 GRPC_SUBCHANNEL_REF_MUTATE_PURPOSE("STRONG_REF"));
+  GPR_ASSERT((old_refs & STRONG_REF_MASK) != 0);
+  return this;
+}
+
+void Subchannel::Unref(GRPC_SUBCHANNEL_REF_EXTRA_ARGS) {
+  gpr_atm old_refs;
+  // add a weak ref and subtract a strong ref (atomically)
+  old_refs = RefMutate(
+      static_cast<gpr_atm>(1) - static_cast<gpr_atm>(1 << INTERNAL_REF_BITS),
+      1 GRPC_SUBCHANNEL_REF_MUTATE_PURPOSE("STRONG_UNREF"));
+  if ((old_refs & STRONG_REF_MASK) == (1 << INTERNAL_REF_BITS)) {
+    Disconnect();
+  }
+  GRPC_SUBCHANNEL_WEAK_UNREF(this, "strong-unref");
+}
+
+Subchannel* Subchannel::WeakRef(GRPC_SUBCHANNEL_REF_EXTRA_ARGS) {
+  gpr_atm old_refs;
+  old_refs = RefMutate(1, 0 GRPC_SUBCHANNEL_REF_MUTATE_PURPOSE("WEAK_REF"));
+  GPR_ASSERT(old_refs != 0);
+  return this;
+}
+
+namespace {
+
+void subchannel_destroy(void* arg, grpc_error* error) {
+  Subchannel* self = static_cast<Subchannel*>(arg);
+  Delete(self);
+}
+
+}  // namespace
+
+void Subchannel::WeakUnref(GRPC_SUBCHANNEL_REF_EXTRA_ARGS) {
+  gpr_atm old_refs;
+  old_refs = RefMutate(-static_cast<gpr_atm>(1),
+                       1 GRPC_SUBCHANNEL_REF_MUTATE_PURPOSE("WEAK_UNREF"));
+  if (old_refs == 1) {
+    GRPC_CLOSURE_SCHED(GRPC_CLOSURE_CREATE(subchannel_destroy, this,
+                                           grpc_schedule_on_exec_ctx),
+                       GRPC_ERROR_NONE);
+  }
+}
+
+Subchannel* Subchannel::RefFromWeakRef(GRPC_SUBCHANNEL_REF_EXTRA_ARGS) {
+  for (;;) {
+    gpr_atm old_refs = gpr_atm_acq_load(&ref_pair_);
+    if (old_refs >= (1 << INTERNAL_REF_BITS)) {
+      gpr_atm new_refs = old_refs + (1 << INTERNAL_REF_BITS);
+      if (gpr_atm_rel_cas(&ref_pair_, old_refs, new_refs)) {
+        return this;
+      }
+    } else {
+      return nullptr;
+    }
+  }
+}
+
+const char* Subchannel::GetTargetAddress() {
+  const grpc_arg* addr_arg =
+      grpc_channel_args_find(args_, GRPC_ARG_SUBCHANNEL_ADDRESS);
+  const char* addr_str = grpc_channel_arg_get_string(addr_arg);
+  GPR_ASSERT(addr_str != nullptr);  // Should have been set by LB policy.
+  return addr_str;
+}
+
+channelz::SubchannelNode* Subchannel::channelz_node() {
+  return channelz_node_.get();
+}
+
+grpc_connectivity_state Subchannel::CheckConnectivityState(
+    const char* health_check_service_name,
+    RefCountedPtr<ConnectedSubchannel>* connected_subchannel) {
+  MutexLock lock(&mu_);
+  grpc_connectivity_state state;
+  if (health_check_service_name == nullptr) {
+    state = state_;
+  } else {
+    state = health_watcher_map_.CheckConnectivityStateLocked(
+        this, health_check_service_name);
+  }
+  if (connected_subchannel != nullptr && state == GRPC_CHANNEL_READY) {
+    *connected_subchannel = connected_subchannel_;
+  }
+  return state;
+}
+
+void Subchannel::WatchConnectivityState(
+    grpc_connectivity_state initial_state,
+    UniquePtr<char> health_check_service_name,
+    OrphanablePtr<ConnectivityStateWatcherInterface> watcher) {
+  MutexLock lock(&mu_);
+  grpc_pollset_set* interested_parties = watcher->interested_parties();
+  if (interested_parties != nullptr) {
+    grpc_pollset_set_add_pollset_set(pollset_set_, interested_parties);
+  }
+  if (health_check_service_name == nullptr) {
+    if (state_ != initial_state) {
+      watcher->OnConnectivityStateChange(state_, connected_subchannel_);
+    }
+    watcher_list_.AddWatcherLocked(std::move(watcher));
+  } else {
+    health_watcher_map_.AddWatcherLocked(this, initial_state,
+                                         std::move(health_check_service_name),
+                                         std::move(watcher));
+  }
+}
+
+void Subchannel::CancelConnectivityStateWatch(
+    const char* health_check_service_name,
+    ConnectivityStateWatcherInterface* watcher) {
+  MutexLock lock(&mu_);
+  grpc_pollset_set* interested_parties = watcher->interested_parties();
+  if (interested_parties != nullptr) {
+    grpc_pollset_set_del_pollset_set(pollset_set_, interested_parties);
+  }
+  if (health_check_service_name == nullptr) {
+    watcher_list_.RemoveWatcherLocked(watcher);
+  } else {
+    health_watcher_map_.RemoveWatcherLocked(health_check_service_name, watcher);
+  }
+}
+
+void Subchannel::AttemptToConnect() {
+  MutexLock lock(&mu_);
+  MaybeStartConnectingLocked();
+}
+
+void Subchannel::ResetBackoff() {
+  MutexLock lock(&mu_);
+  backoff_.Reset();
+  if (have_retry_alarm_) {
+    retry_immediately_ = true;
+    grpc_timer_cancel(&retry_alarm_);
+  } else {
+    backoff_begun_ = false;
+    MaybeStartConnectingLocked();
+  }
+}
+
+grpc_arg Subchannel::CreateSubchannelAddressArg(
+    const grpc_resolved_address* addr) {
+  return grpc_channel_arg_string_create(
+      (char*)GRPC_ARG_SUBCHANNEL_ADDRESS,
+      addr->len > 0 ? grpc_sockaddr_to_uri(addr) : gpr_strdup(""));
+}
+
+const char* Subchannel::GetUriFromSubchannelAddressArg(
+    const grpc_channel_args* args) {
+  const grpc_arg* addr_arg =
+      grpc_channel_args_find(args, GRPC_ARG_SUBCHANNEL_ADDRESS);
+  const char* addr_str = grpc_channel_arg_get_string(addr_arg);
+  GPR_ASSERT(addr_str != nullptr);  // Should have been set by LB policy.
+  return addr_str;
+}
+
+namespace {
+
+void UriToSockaddr(const char* uri_str, grpc_resolved_address* addr) {
+  grpc_uri* uri = grpc_uri_parse(uri_str, 0 /* suppress_errors */);
+  GPR_ASSERT(uri != nullptr);
+  if (!grpc_parse_uri(uri, addr)) memset(addr, 0, sizeof(*addr));
+  grpc_uri_destroy(uri);
+}
+
+}  // namespace
+
+void Subchannel::GetAddressFromSubchannelAddressArg(
+    const grpc_channel_args* args, grpc_resolved_address* addr) {
+  const char* addr_uri_str = GetUriFromSubchannelAddressArg(args);
+  memset(addr, 0, sizeof(*addr));
+  if (*addr_uri_str != '\0') {
+    UriToSockaddr(addr_uri_str, addr);
+  }
+}
+
+namespace {
+
+// Returns a string indicating the subchannel's connectivity state change to
+// \a state.
+const char* SubchannelConnectivityStateChangeString(
+    grpc_connectivity_state state) {
+  switch (state) {
+    case GRPC_CHANNEL_IDLE:
+      return "Subchannel state change to IDLE";
+    case GRPC_CHANNEL_CONNECTING:
+      return "Subchannel state change to CONNECTING";
+    case GRPC_CHANNEL_READY:
+      return "Subchannel state change to READY";
+    case GRPC_CHANNEL_TRANSIENT_FAILURE:
+      return "Subchannel state change to TRANSIENT_FAILURE";
+    case GRPC_CHANNEL_SHUTDOWN:
+      return "Subchannel state change to SHUTDOWN";
+  }
+  GPR_UNREACHABLE_CODE(return "UNKNOWN");
+}
+
+}  // namespace
+
+// Note: Must be called with a state that is different from the current state.
+void Subchannel::SetConnectivityStateLocked(grpc_connectivity_state state) {
+  state_ = state;
+  if (channelz_node_ != nullptr) {
+    channelz_node_->UpdateConnectivityState(state);
+    channelz_node_->AddTraceEvent(
+        channelz::ChannelTrace::Severity::Info,
+        grpc_slice_from_static_string(
+            SubchannelConnectivityStateChangeString(state)));
+  }
+  // Notify non-health watchers.
+  watcher_list_.NotifyLocked(this, state);
+  // Notify health watchers.
+  health_watcher_map_.NotifyLocked(state);
+}
+
+void Subchannel::MaybeStartConnectingLocked() {
+  if (disconnected_) {
+    // Don't try to connect if we're already disconnected.
+    return;
+  }
+  if (connecting_) {
+    // Already connecting: don't restart.
+    return;
+  }
+  if (connected_subchannel_ != nullptr) {
+    // Already connected: don't restart.
+    return;
+  }
+  connecting_ = true;
+  GRPC_SUBCHANNEL_WEAK_REF(this, "connecting");
+  if (!backoff_begun_) {
+    backoff_begun_ = true;
+    ContinueConnectingLocked();
+  } else {
+    GPR_ASSERT(!have_retry_alarm_);
+    have_retry_alarm_ = true;
+    const grpc_millis time_til_next =
+        next_attempt_deadline_ - ExecCtx::Get()->Now();
+    if (time_til_next <= 0) {
+      gpr_log(GPR_INFO, "Subchannel %p: Retry immediately", this);
+    } else {
+      gpr_log(GPR_INFO, "Subchannel %p: Retry in %" PRId64 " milliseconds",
+              this, time_til_next);
+    }
+    GRPC_CLOSURE_INIT(&on_retry_alarm_, OnRetryAlarm, this,
+                      grpc_schedule_on_exec_ctx);
+    grpc_timer_init(&retry_alarm_, next_attempt_deadline_, &on_retry_alarm_);
+  }
+}
+
+void Subchannel::OnRetryAlarm(void* arg, grpc_error* error) {
+  Subchannel* c = static_cast<Subchannel*>(arg);
+  // TODO(soheilhy): Once subchannel refcounting is simplified, we can get use
+  //                 MutexLock instead of ReleasableMutexLock, here.
+  ReleasableMutexLock lock(&c->mu_);
+  c->have_retry_alarm_ = false;
+  if (c->disconnected_) {
+    error = GRPC_ERROR_CREATE_REFERENCING_FROM_STATIC_STRING("Disconnected",
+                                                             &error, 1);
+  } else if (c->retry_immediately_) {
+    c->retry_immediately_ = false;
+    error = GRPC_ERROR_NONE;
+  } else {
+    GRPC_ERROR_REF(error);
+  }
+  if (error == GRPC_ERROR_NONE) {
+    gpr_log(GPR_INFO, "Failed to connect to channel, retrying");
+    c->ContinueConnectingLocked();
+    lock.Unlock();
+  } else {
+    lock.Unlock();
+    GRPC_SUBCHANNEL_WEAK_UNREF(c, "connecting");
+  }
+  GRPC_ERROR_UNREF(error);
+}
+
+void Subchannel::ContinueConnectingLocked() {
+  grpc_connect_in_args args;
+  args.interested_parties = pollset_set_;
+  const grpc_millis min_deadline =
+      min_connect_timeout_ms_ + ExecCtx::Get()->Now();
+  next_attempt_deadline_ = backoff_.NextAttemptTime();
+  args.deadline = std::max(next_attempt_deadline_, min_deadline);
+  args.channel_args = args_;
+  SetConnectivityStateLocked(GRPC_CHANNEL_CONNECTING);
+  grpc_connector_connect(connector_, &args, &connecting_result_,
+                         &on_connecting_finished_);
+}
+
+void Subchannel::OnConnectingFinished(void* arg, grpc_error* error) {
+  auto* c = static_cast<Subchannel*>(arg);
+  grpc_channel_args* delete_channel_args = c->connecting_result_.channel_args;
+  GRPC_SUBCHANNEL_WEAK_REF(c, "on_connecting_finished");
+  {
+    MutexLock lock(&c->mu_);
+    c->connecting_ = false;
+    if (c->connecting_result_.transport != nullptr &&
+        c->PublishTransportLocked()) {
+      // Do nothing, transport was published.
+    } else if (c->disconnected_) {
+      GRPC_SUBCHANNEL_WEAK_UNREF(c, "connecting");
+    } else {
+      gpr_log(GPR_INFO, "Connect failed: %s", grpc_error_string(error));
+      c->SetConnectivityStateLocked(GRPC_CHANNEL_TRANSIENT_FAILURE);
+      GRPC_SUBCHANNEL_WEAK_UNREF(c, "connecting");
+    }
+  }
+  GRPC_SUBCHANNEL_WEAK_UNREF(c, "on_connecting_finished");
+  grpc_channel_args_destroy(delete_channel_args);
+}
+
+namespace {
+
+void ConnectionDestroy(void* arg, grpc_error* error) {
+  grpc_channel_stack* stk = static_cast<grpc_channel_stack*>(arg);
+  grpc_channel_stack_destroy(stk);
+  gpr_free(stk);
+}
+
+}  // namespace
+
+bool Subchannel::PublishTransportLocked() {
+  // Construct channel stack.
+  grpc_channel_stack_builder* builder = grpc_channel_stack_builder_create();
+  grpc_channel_stack_builder_set_channel_arguments(
+      builder, connecting_result_.channel_args);
+  grpc_channel_stack_builder_set_transport(builder,
+                                           connecting_result_.transport);
+  if (!grpc_channel_init_create_stack(builder, GRPC_CLIENT_SUBCHANNEL)) {
+    grpc_channel_stack_builder_destroy(builder);
+    return false;
+  }
+  grpc_channel_stack* stk;
+  grpc_error* error = grpc_channel_stack_builder_finish(
+      builder, 0, 1, ConnectionDestroy, nullptr,
+      reinterpret_cast<void**>(&stk));
+  if (error != GRPC_ERROR_NONE) {
+    grpc_transport_destroy(connecting_result_.transport);
+    gpr_log(GPR_ERROR, "error initializing subchannel stack: %s",
+            grpc_error_string(error));
+    GRPC_ERROR_UNREF(error);
+    return false;
+  }
+  RefCountedPtr<channelz::SocketNode> socket =
+      std::move(connecting_result_.socket);
+  connecting_result_.reset();
+  if (disconnected_) {
+    grpc_channel_stack_destroy(stk);
+    gpr_free(stk);
+    return false;
+  }
+  // Publish.
+  connected_subchannel_.reset(
+      New<ConnectedSubchannel>(stk, args_, channelz_node_));
+  gpr_log(GPR_INFO, "New connected subchannel at %p for subchannel %p",
+          connected_subchannel_.get(), this);
+  if (channelz_node_ != nullptr) {
+    channelz_node_->SetChildSocket(std::move(socket));
+  }
+  // Instantiate state watcher.  Will clean itself up.
+  New<ConnectedSubchannelStateWatcher>(this);
+  // Report initial state.
+  SetConnectivityStateLocked(GRPC_CHANNEL_READY);
+  return true;
+}
+
+void Subchannel::Disconnect() {
+  // The subchannel_pool is only used once here in this subchannel, so the
+  // access can be outside of the lock.
+  if (subchannel_pool_ != nullptr) {
+    subchannel_pool_->UnregisterSubchannel(key_);
+    subchannel_pool_.reset();
+  }
+  MutexLock lock(&mu_);
+  GPR_ASSERT(!disconnected_);
+  disconnected_ = true;
+  grpc_connector_shutdown(connector_, GRPC_ERROR_CREATE_FROM_STATIC_STRING(
+                                          "Subchannel disconnected"));
+  connected_subchannel_.reset();
+  health_watcher_map_.ShutdownLocked();
+}
+
+gpr_atm Subchannel::RefMutate(
+    gpr_atm delta, int barrier GRPC_SUBCHANNEL_REF_MUTATE_EXTRA_ARGS) {
+  gpr_atm old_val = barrier ? gpr_atm_full_fetch_add(&ref_pair_, delta)
+                            : gpr_atm_no_barrier_fetch_add(&ref_pair_, delta);
+#ifndef NDEBUG
+  if (grpc_trace_subchannel_refcount.enabled()) {
+    gpr_log(file, line, GPR_LOG_SEVERITY_DEBUG,
+            "SUBCHANNEL: %p %12s 0x%" PRIxPTR " -> 0x%" PRIxPTR " [%s]", this,
+            purpose, old_val, old_val + delta, reason);
+  }
+#endif
+  return old_val;
+}
+
+}  // namespace grpc_core