Built motion from commit 6a09e18b.|2.6.11
[motion2.git] / legacy-libs / grpc / deps / grpc / src / core / ext / filters / client_channel / subchannel.h
diff --git a/legacy-libs/grpc/deps/grpc/src/core/ext/filters/client_channel/subchannel.h b/legacy-libs/grpc/deps/grpc/src/core/ext/filters/client_channel/subchannel.h
new file mode 100644 (file)
index 0000000..c178401
--- /dev/null
@@ -0,0 +1,406 @@
+/*
+ *
+ * Copyright 2015 gRPC authors.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ *
+ */
+
+#ifndef GRPC_CORE_EXT_FILTERS_CLIENT_CHANNEL_SUBCHANNEL_H
+#define GRPC_CORE_EXT_FILTERS_CLIENT_CHANNEL_SUBCHANNEL_H
+
+#include <grpc/support/port_platform.h>
+
+#include "src/core/ext/filters/client_channel/client_channel_channelz.h"
+#include "src/core/ext/filters/client_channel/connector.h"
+#include "src/core/ext/filters/client_channel/subchannel_pool_interface.h"
+#include "src/core/lib/backoff/backoff.h"
+#include "src/core/lib/channel/channel_stack.h"
+#include "src/core/lib/gpr/time_precise.h"
+#include "src/core/lib/gprpp/arena.h"
+#include "src/core/lib/gprpp/map.h"
+#include "src/core/lib/gprpp/ref_counted.h"
+#include "src/core/lib/gprpp/ref_counted_ptr.h"
+#include "src/core/lib/gprpp/sync.h"
+#include "src/core/lib/iomgr/polling_entity.h"
+#include "src/core/lib/iomgr/timer.h"
+#include "src/core/lib/transport/connectivity_state.h"
+#include "src/core/lib/transport/metadata.h"
+
+// Channel arg containing a grpc_resolved_address to connect to.
+#define GRPC_ARG_SUBCHANNEL_ADDRESS "grpc.subchannel_address"
+
+// For debugging refcounting.
+#ifndef NDEBUG
+#define GRPC_SUBCHANNEL_REF(p, r) (p)->Ref(__FILE__, __LINE__, (r))
+#define GRPC_SUBCHANNEL_REF_FROM_WEAK_REF(p, r) \
+  (p)->RefFromWeakRef(__FILE__, __LINE__, (r))
+#define GRPC_SUBCHANNEL_UNREF(p, r) (p)->Unref(__FILE__, __LINE__, (r))
+#define GRPC_SUBCHANNEL_WEAK_REF(p, r) (p)->WeakRef(__FILE__, __LINE__, (r))
+#define GRPC_SUBCHANNEL_WEAK_UNREF(p, r) (p)->WeakUnref(__FILE__, __LINE__, (r))
+#define GRPC_SUBCHANNEL_REF_EXTRA_ARGS \
+  const char *file, int line, const char *reason
+#define GRPC_SUBCHANNEL_REF_REASON reason
+#define GRPC_SUBCHANNEL_REF_MUTATE_EXTRA_ARGS \
+  , GRPC_SUBCHANNEL_REF_EXTRA_ARGS, const char* purpose
+#define GRPC_SUBCHANNEL_REF_MUTATE_PURPOSE(x) , file, line, reason, x
+#else
+#define GRPC_SUBCHANNEL_REF(p, r) (p)->Ref()
+#define GRPC_SUBCHANNEL_REF_FROM_WEAK_REF(p, r) (p)->RefFromWeakRef()
+#define GRPC_SUBCHANNEL_UNREF(p, r) (p)->Unref()
+#define GRPC_SUBCHANNEL_WEAK_REF(p, r) (p)->WeakRef()
+#define GRPC_SUBCHANNEL_WEAK_UNREF(p, r) (p)->WeakUnref()
+#define GRPC_SUBCHANNEL_REF_EXTRA_ARGS
+#define GRPC_SUBCHANNEL_REF_REASON ""
+#define GRPC_SUBCHANNEL_REF_MUTATE_EXTRA_ARGS
+#define GRPC_SUBCHANNEL_REF_MUTATE_PURPOSE(x)
+#endif
+
+namespace grpc_core {
+
+class SubchannelCall;
+
+class ConnectedSubchannel : public RefCounted<ConnectedSubchannel> {
+ public:
+  ConnectedSubchannel(
+      grpc_channel_stack* channel_stack, const grpc_channel_args* args,
+      RefCountedPtr<channelz::SubchannelNode> channelz_subchannel);
+  ~ConnectedSubchannel();
+
+  void NotifyOnStateChange(grpc_pollset_set* interested_parties,
+                           grpc_connectivity_state* state,
+                           grpc_closure* closure);
+  void Ping(grpc_closure* on_initiate, grpc_closure* on_ack);
+
+  grpc_channel_stack* channel_stack() const { return channel_stack_; }
+  const grpc_channel_args* args() const { return args_; }
+  channelz::SubchannelNode* channelz_subchannel() const {
+    return channelz_subchannel_.get();
+  }
+
+  size_t GetInitialCallSizeEstimate(size_t parent_data_size) const;
+
+ private:
+  grpc_channel_stack* channel_stack_;
+  grpc_channel_args* args_;
+  // ref counted pointer to the channelz node in this connected subchannel's
+  // owning subchannel.
+  RefCountedPtr<channelz::SubchannelNode> channelz_subchannel_;
+};
+
+// Implements the interface of RefCounted<>.
+class SubchannelCall {
+ public:
+  struct Args {
+    RefCountedPtr<ConnectedSubchannel> connected_subchannel;
+    grpc_polling_entity* pollent;
+    grpc_slice path;
+    gpr_cycle_counter start_time;
+    grpc_millis deadline;
+    Arena* arena;
+    grpc_call_context_element* context;
+    CallCombiner* call_combiner;
+    size_t parent_data_size;
+  };
+  static RefCountedPtr<SubchannelCall> Create(Args args, grpc_error** error);
+
+  // Continues processing a transport stream op batch.
+  void StartTransportStreamOpBatch(grpc_transport_stream_op_batch* batch);
+
+  // Returns a pointer to the parent data associated with the subchannel call.
+  // The data will be of the size specified in \a parent_data_size field of
+  // the args passed to \a ConnectedSubchannel::CreateCall().
+  void* GetParentData();
+
+  // Returns the call stack of the subchannel call.
+  grpc_call_stack* GetCallStack();
+
+  // Sets the 'then_schedule_closure' argument for call stack destruction.
+  // Must be called once per call.
+  void SetAfterCallStackDestroy(grpc_closure* closure);
+
+  // Interface of RefCounted<>.
+  RefCountedPtr<SubchannelCall> Ref() GRPC_MUST_USE_RESULT;
+  RefCountedPtr<SubchannelCall> Ref(const DebugLocation& location,
+                                    const char* reason) GRPC_MUST_USE_RESULT;
+  // When refcount drops to 0, destroys itself and the associated call stack,
+  // but does NOT free the memory because it's in the call arena.
+  void Unref();
+  void Unref(const DebugLocation& location, const char* reason);
+
+  static void Destroy(void* arg, grpc_error* error);
+
+ private:
+  // Allow RefCountedPtr<> to access IncrementRefCount().
+  template <typename T>
+  friend class RefCountedPtr;
+
+  SubchannelCall(Args args, grpc_error** error);
+
+  // If channelz is enabled, intercepts recv_trailing so that we may check the
+  // status and associate it to a subchannel.
+  void MaybeInterceptRecvTrailingMetadata(
+      grpc_transport_stream_op_batch* batch);
+
+  static void RecvTrailingMetadataReady(void* arg, grpc_error* error);
+
+  // Interface of RefCounted<>.
+  void IncrementRefCount();
+  void IncrementRefCount(const DebugLocation& location, const char* reason);
+
+  RefCountedPtr<ConnectedSubchannel> connected_subchannel_;
+  grpc_closure* after_call_stack_destroy_ = nullptr;
+  // State needed to support channelz interception of recv trailing metadata.
+  grpc_closure recv_trailing_metadata_ready_;
+  grpc_closure* original_recv_trailing_metadata_ = nullptr;
+  grpc_metadata_batch* recv_trailing_metadata_ = nullptr;
+  grpc_millis deadline_;
+};
+
+// A subchannel that knows how to connect to exactly one target address. It
+// provides a target for load balancing.
+//
+// Note that this is the "real" subchannel implementation, whose API is
+// different from the SubchannelInterface that is exposed to LB policy
+// implementations.  The client channel provides an adaptor class
+// (SubchannelWrapper) that "converts" between the two.
+class Subchannel {
+ public:
+  class ConnectivityStateWatcherInterface
+      : public InternallyRefCounted<ConnectivityStateWatcherInterface> {
+   public:
+    virtual ~ConnectivityStateWatcherInterface() = default;
+
+    // Will be invoked whenever the subchannel's connectivity state
+    // changes.  There will be only one invocation of this method on a
+    // given watcher instance at any given time.
+    //
+    // When the state changes to READY, connected_subchannel will
+    // contain a ref to the connected subchannel.  When it changes from
+    // READY to some other state, the implementation must release its
+    // ref to the connected subchannel.
+    virtual void OnConnectivityStateChange(
+        grpc_connectivity_state new_state,
+        RefCountedPtr<ConnectedSubchannel> connected_subchannel)  // NOLINT
+        GRPC_ABSTRACT;
+
+    virtual grpc_pollset_set* interested_parties() GRPC_ABSTRACT;
+
+    GRPC_ABSTRACT_BASE_CLASS
+  };
+
+  // The ctor and dtor are not intended to use directly.
+  Subchannel(SubchannelKey* key, grpc_connector* connector,
+             const grpc_channel_args* args);
+  ~Subchannel();
+
+  // Creates a subchannel given \a connector and \a args.
+  static Subchannel* Create(grpc_connector* connector,
+                            const grpc_channel_args* args);
+
+  // Strong and weak refcounting.
+  Subchannel* Ref(GRPC_SUBCHANNEL_REF_EXTRA_ARGS);
+  void Unref(GRPC_SUBCHANNEL_REF_EXTRA_ARGS);
+  Subchannel* WeakRef(GRPC_SUBCHANNEL_REF_EXTRA_ARGS);
+  void WeakUnref(GRPC_SUBCHANNEL_REF_EXTRA_ARGS);
+  // Attempts to return a strong ref when only the weak refcount is guaranteed
+  // non-zero. If the strong refcount is zero, does not alter the refcount and
+  // returns null.
+  Subchannel* RefFromWeakRef(GRPC_SUBCHANNEL_REF_EXTRA_ARGS);
+
+  // Gets the string representing the subchannel address.
+  // Caller doesn't take ownership.
+  const char* GetTargetAddress();
+
+  const grpc_channel_args* channel_args() const { return args_; }
+
+  channelz::SubchannelNode* channelz_node();
+
+  // Returns the current connectivity state of the subchannel.
+  // If health_check_service_name is non-null, the returned connectivity
+  // state will be based on the state reported by the backend for that
+  // service name.
+  // If the return value is GRPC_CHANNEL_READY, also sets *connected_subchannel.
+  grpc_connectivity_state CheckConnectivityState(
+      const char* health_check_service_name,
+      RefCountedPtr<ConnectedSubchannel>* connected_subchannel);
+
+  // Starts watching the subchannel's connectivity state.
+  // The first callback to the watcher will be delivered when the
+  // subchannel's connectivity state becomes a value other than
+  // initial_state, which may happen immediately.
+  // Subsequent callbacks will be delivered as the subchannel's state
+  // changes.
+  // The watcher will be destroyed either when the subchannel is
+  // destroyed or when CancelConnectivityStateWatch() is called.
+  void WatchConnectivityState(
+      grpc_connectivity_state initial_state,
+      UniquePtr<char> health_check_service_name,
+      OrphanablePtr<ConnectivityStateWatcherInterface> watcher);
+
+  // Cancels a connectivity state watch.
+  // If the watcher has already been destroyed, this is a no-op.
+  void CancelConnectivityStateWatch(const char* health_check_service_name,
+                                    ConnectivityStateWatcherInterface* watcher);
+
+  // Attempt to connect to the backend.  Has no effect if already connected.
+  void AttemptToConnect();
+
+  // Resets the connection backoff of the subchannel.
+  // TODO(roth): Move connection backoff out of subchannels and up into LB
+  // policy code (probably by adding a SubchannelGroup between
+  // SubchannelList and SubchannelData), at which point this method can
+  // go away.
+  void ResetBackoff();
+
+  // Returns a new channel arg encoding the subchannel address as a URI
+  // string. Caller is responsible for freeing the string.
+  static grpc_arg CreateSubchannelAddressArg(const grpc_resolved_address* addr);
+
+  // Returns the URI string from the subchannel address arg in \a args.
+  static const char* GetUriFromSubchannelAddressArg(
+      const grpc_channel_args* args);
+
+  // Sets \a addr from the subchannel address arg in \a args.
+  static void GetAddressFromSubchannelAddressArg(const grpc_channel_args* args,
+                                                 grpc_resolved_address* addr);
+
+ private:
+  // A linked list of ConnectivityStateWatcherInterfaces that are monitoring
+  // the subchannel's state.
+  class ConnectivityStateWatcherList {
+   public:
+    ~ConnectivityStateWatcherList() { Clear(); }
+
+    void AddWatcherLocked(
+        OrphanablePtr<ConnectivityStateWatcherInterface> watcher);
+    void RemoveWatcherLocked(ConnectivityStateWatcherInterface* watcher);
+
+    // Notifies all watchers in the list about a change to state.
+    void NotifyLocked(Subchannel* subchannel, grpc_connectivity_state state);
+
+    void Clear() { watchers_.clear(); }
+
+    bool empty() const { return watchers_.empty(); }
+
+   private:
+    // TODO(roth): This could be a set instead of a map if we had a set
+    // implementation.
+    Map<ConnectivityStateWatcherInterface*,
+        OrphanablePtr<ConnectivityStateWatcherInterface>>
+        watchers_;
+  };
+
+  // A map that tracks ConnectivityStateWatcherInterfaces using a particular
+  // health check service name.
+  //
+  // There is one entry in the map for each health check service name.
+  // Entries exist only as long as there are watchers using the
+  // corresponding service name.
+  //
+  // A health check client is maintained only while the subchannel is in
+  // state READY.
+  class HealthWatcherMap {
+   public:
+    void AddWatcherLocked(
+        Subchannel* subchannel, grpc_connectivity_state initial_state,
+        UniquePtr<char> health_check_service_name,
+        OrphanablePtr<ConnectivityStateWatcherInterface> watcher);
+    void RemoveWatcherLocked(const char* health_check_service_name,
+                             ConnectivityStateWatcherInterface* watcher);
+
+    // Notifies the watcher when the subchannel's state changes.
+    void NotifyLocked(grpc_connectivity_state state);
+
+    grpc_connectivity_state CheckConnectivityStateLocked(
+        Subchannel* subchannel, const char* health_check_service_name);
+
+    void ShutdownLocked();
+
+   private:
+    class HealthWatcher;
+
+    Map<const char*, OrphanablePtr<HealthWatcher>, StringLess> map_;
+  };
+
+  class ConnectedSubchannelStateWatcher;
+
+  // Sets the subchannel's connectivity state to \a state.
+  void SetConnectivityStateLocked(grpc_connectivity_state state);
+
+  // Methods for connection.
+  void MaybeStartConnectingLocked();
+  static void OnRetryAlarm(void* arg, grpc_error* error);
+  void ContinueConnectingLocked();
+  static void OnConnectingFinished(void* arg, grpc_error* error);
+  bool PublishTransportLocked();
+  void Disconnect();
+
+  gpr_atm RefMutate(gpr_atm delta,
+                    int barrier GRPC_SUBCHANNEL_REF_MUTATE_EXTRA_ARGS);
+
+  // The subchannel pool this subchannel is in.
+  RefCountedPtr<SubchannelPoolInterface> subchannel_pool_;
+  // TODO(juanlishen): Consider using args_ as key_ directly.
+  // Subchannel key that identifies this subchannel in the subchannel pool.
+  SubchannelKey* key_;
+  // Channel args.
+  grpc_channel_args* args_;
+  // pollset_set tracking who's interested in a connection being setup.
+  grpc_pollset_set* pollset_set_;
+  // Protects the other members.
+  Mutex mu_;
+  // Refcount
+  //    - lower INTERNAL_REF_BITS bits are for internal references:
+  //      these do not keep the subchannel open.
+  //    - upper remaining bits are for public references: these do
+  //      keep the subchannel open
+  gpr_atm ref_pair_;
+
+  // Connection states.
+  grpc_connector* connector_ = nullptr;
+  // Set during connection.
+  grpc_connect_out_args connecting_result_;
+  grpc_closure on_connecting_finished_;
+  // Active connection, or null.
+  RefCountedPtr<ConnectedSubchannel> connected_subchannel_;
+  bool connecting_ = false;
+  bool disconnected_ = false;
+
+  // Connectivity state tracking.
+  grpc_connectivity_state state_ = GRPC_CHANNEL_IDLE;
+  // The list of watchers without a health check service name.
+  ConnectivityStateWatcherList watcher_list_;
+  // The map of watchers with health check service names.
+  HealthWatcherMap health_watcher_map_;
+
+  // Backoff state.
+  BackOff backoff_;
+  grpc_millis next_attempt_deadline_;
+  grpc_millis min_connect_timeout_ms_;
+  bool backoff_begun_ = false;
+
+  // Retry alarm.
+  grpc_timer retry_alarm_;
+  grpc_closure on_retry_alarm_;
+  bool have_retry_alarm_ = false;
+  // reset_backoff() was called while alarm was pending.
+  bool retry_immediately_ = false;
+
+  // Channelz tracking.
+  RefCountedPtr<channelz::SubchannelNode> channelz_node_;
+};
+
+}  // namespace grpc_core
+
+#endif /* GRPC_CORE_EXT_FILTERS_CLIENT_CHANNEL_SUBCHANNEL_H */