Built motion from commit 6a09e18b.|2.6.11
[motion2.git] / legacy-libs / grpc-cloned / deps / grpc / src / core / lib / iomgr / resource_quota.cc
diff --git a/legacy-libs/grpc-cloned/deps/grpc/src/core/lib/iomgr/resource_quota.cc b/legacy-libs/grpc-cloned/deps/grpc/src/core/lib/iomgr/resource_quota.cc
new file mode 100644 (file)
index 0000000..dffac34
--- /dev/null
@@ -0,0 +1,1006 @@
+/*
+ *
+ * Copyright 2016 gRPC authors.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ *
+ */
+
+#include <grpc/support/port_platform.h>
+
+#include "src/core/lib/iomgr/resource_quota.h"
+
+#include <inttypes.h>
+#include <limits.h>
+#include <stdint.h>
+#include <string.h>
+
+#include <grpc/slice_buffer.h>
+#include <grpc/support/alloc.h>
+#include <grpc/support/log.h>
+#include <grpc/support/string_util.h>
+
+#include "src/core/lib/gpr/useful.h"
+#include "src/core/lib/iomgr/combiner.h"
+#include "src/core/lib/slice/slice_internal.h"
+
+grpc_core::TraceFlag grpc_resource_quota_trace(false, "resource_quota");
+
+#define MEMORY_USAGE_ESTIMATION_MAX 65536
+
+/* Internal linked list pointers for a resource user */
+typedef struct {
+  grpc_resource_user* next;
+  grpc_resource_user* prev;
+} grpc_resource_user_link;
+
+/* Resource users are kept in (potentially) several intrusive linked lists
+   at once. These are the list names. */
+typedef enum {
+  /* Resource users that are waiting for an allocation */
+  GRPC_RULIST_AWAITING_ALLOCATION,
+  /* Resource users that have free memory available for internal reclamation */
+  GRPC_RULIST_NON_EMPTY_FREE_POOL,
+  /* Resource users that have published a benign reclamation is available */
+  GRPC_RULIST_RECLAIMER_BENIGN,
+  /* Resource users that have published a destructive reclamation is
+     available */
+  GRPC_RULIST_RECLAIMER_DESTRUCTIVE,
+  /* Number of lists: must be last */
+  GRPC_RULIST_COUNT
+} grpc_rulist;
+
+struct grpc_resource_user {
+  /* The quota this resource user consumes from */
+  grpc_resource_quota* resource_quota;
+
+  /* Closure to schedule an allocation under the resource quota combiner lock */
+  grpc_closure allocate_closure;
+  /* Closure to publish a non empty free pool under the resource quota combiner
+     lock */
+  grpc_closure add_to_free_pool_closure;
+
+  /* one ref for each ref call (released by grpc_resource_user_unref), and one
+     ref for each byte allocated (released by grpc_resource_user_free) */
+  gpr_atm refs;
+  /* is this resource user unlocked? starts at 0, increases for each shutdown
+     call */
+  gpr_atm shutdown;
+
+  gpr_mu mu;
+  /* The amount of memory (in bytes) this user has cached for its own use: to
+     avoid quota contention, each resource user can keep some memory in
+     addition to what it is immediately using (e.g., for caching), and the quota
+     can pull it back under memory pressure.
+     This value can become negative if more memory has been requested than
+     existed in the free pool, at which point the quota is consulted to bring
+     this value non-negative (asynchronously). */
+  int64_t free_pool;
+  /* A list of closures to call once free_pool becomes non-negative - ie when
+     all outstanding allocations have been granted. */
+  grpc_closure_list on_allocated;
+  /* True if we are currently trying to allocate from the quota, false if not */
+  bool allocating;
+  /* The amount of memory (in bytes) that has been requested from this user
+   * asynchronously but hasn't been granted yet. */
+  int64_t outstanding_allocations;
+  /* True if we are currently trying to add ourselves to the non-free quota
+     list, false otherwise */
+  bool added_to_free_pool;
+
+  /* The number of threads currently allocated to this resource user */
+  gpr_atm num_threads_allocated;
+
+  /* Reclaimers: index 0 is the benign reclaimer, 1 is the destructive reclaimer
+   */
+  grpc_closure* reclaimers[2];
+  /* Reclaimers just posted: once we're in the combiner lock, we'll move them
+     to the array above */
+  grpc_closure* new_reclaimers[2];
+  /* Trampoline closures to finish reclamation and re-enter the quota combiner
+     lock */
+  grpc_closure post_reclaimer_closure[2];
+
+  /* Closure to execute under the quota combiner to de-register and shutdown the
+     resource user */
+  grpc_closure destroy_closure;
+
+  /* Links in the various grpc_rulist lists */
+  grpc_resource_user_link links[GRPC_RULIST_COUNT];
+
+  /* The name of this resource user, for debugging/tracing */
+  char* name;
+};
+
+struct grpc_resource_quota {
+  /* refcount */
+  gpr_refcount refs;
+
+  /* estimate of current memory usage
+     scaled to the range [0..RESOURCE_USAGE_ESTIMATION_MAX] */
+  gpr_atm memory_usage_estimation;
+
+  /* Master combiner lock: all activity on a quota executes under this combiner
+   * (so no mutex is needed for this data structure) */
+  grpc_combiner* combiner;
+  /* Size of the resource quota */
+  int64_t size;
+  /* Amount of free memory in the resource quota */
+  int64_t free_pool;
+  /* Used size of memory in the resource quota. Updated as soon as the resource
+   * users start to allocate or free the memory. */
+  gpr_atm used;
+
+  gpr_atm last_size;
+
+  /* Mutex to protect max_threads and num_threads_allocated */
+  /* Note: We could have used gpr_atm for max_threads and num_threads_allocated
+   * and avoid having this mutex; but in that case, each invocation of the
+   * function grpc_resource_user_allocate_threads() would have had to do at
+   * least two atomic loads (for max_threads and num_threads_allocated) followed
+   * by a CAS (on num_threads_allocated).
+   * Moreover, we expect grpc_resource_user_allocate_threads() to be often
+   * called concurrently thereby increasing the chances of failing the CAS
+   * operation. This additional complexity is not worth the tiny perf gain we
+   * may (or may not) have by using atomics */
+  gpr_mu thread_count_mu;
+
+  /* Max number of threads allowed */
+  int max_threads;
+
+  /* Number of threads currently allocated via this resource_quota object */
+  int num_threads_allocated;
+
+  /* Has rq_step been scheduled to occur? */
+  bool step_scheduled;
+
+  /* Are we currently reclaiming memory */
+  bool reclaiming;
+
+  /* Closure around rq_step */
+  grpc_closure rq_step_closure;
+
+  /* Closure around rq_reclamation_done */
+  grpc_closure rq_reclamation_done_closure;
+
+  /* This is only really usable for debugging: it's always a stale pointer, but
+     a stale pointer that might just be fresh enough to guide us to where the
+     reclamation system is stuck */
+  grpc_closure* debug_only_last_initiated_reclaimer;
+  grpc_resource_user* debug_only_last_reclaimer_resource_user;
+
+  /* Roots of all resource user lists */
+  grpc_resource_user* roots[GRPC_RULIST_COUNT];
+
+  char* name;
+};
+
+static void ru_unref_by(grpc_resource_user* resource_user, gpr_atm amount);
+
+/*******************************************************************************
+ * list management
+ */
+
+static void rulist_add_head(grpc_resource_user* resource_user,
+                            grpc_rulist list) {
+  grpc_resource_quota* resource_quota = resource_user->resource_quota;
+  grpc_resource_user** root = &resource_quota->roots[list];
+  if (*root == nullptr) {
+    *root = resource_user;
+    resource_user->links[list].next = resource_user->links[list].prev =
+        resource_user;
+  } else {
+    resource_user->links[list].next = *root;
+    resource_user->links[list].prev = (*root)->links[list].prev;
+    resource_user->links[list].next->links[list].prev =
+        resource_user->links[list].prev->links[list].next = resource_user;
+    *root = resource_user;
+  }
+}
+
+static void rulist_add_tail(grpc_resource_user* resource_user,
+                            grpc_rulist list) {
+  grpc_resource_quota* resource_quota = resource_user->resource_quota;
+  grpc_resource_user** root = &resource_quota->roots[list];
+  if (*root == nullptr) {
+    *root = resource_user;
+    resource_user->links[list].next = resource_user->links[list].prev =
+        resource_user;
+  } else {
+    resource_user->links[list].next = (*root)->links[list].next;
+    resource_user->links[list].prev = *root;
+    resource_user->links[list].next->links[list].prev =
+        resource_user->links[list].prev->links[list].next = resource_user;
+  }
+}
+
+static bool rulist_empty(grpc_resource_quota* resource_quota,
+                         grpc_rulist list) {
+  return resource_quota->roots[list] == nullptr;
+}
+
+static grpc_resource_user* rulist_pop_head(grpc_resource_quota* resource_quota,
+                                           grpc_rulist list) {
+  grpc_resource_user** root = &resource_quota->roots[list];
+  grpc_resource_user* resource_user = *root;
+  if (resource_user == nullptr) {
+    return nullptr;
+  }
+  if (resource_user->links[list].next == resource_user) {
+    *root = nullptr;
+  } else {
+    resource_user->links[list].next->links[list].prev =
+        resource_user->links[list].prev;
+    resource_user->links[list].prev->links[list].next =
+        resource_user->links[list].next;
+    *root = resource_user->links[list].next;
+  }
+  resource_user->links[list].next = resource_user->links[list].prev = nullptr;
+  return resource_user;
+}
+
+static void rulist_remove(grpc_resource_user* resource_user, grpc_rulist list) {
+  if (resource_user->links[list].next == nullptr) return;
+  grpc_resource_quota* resource_quota = resource_user->resource_quota;
+  if (resource_quota->roots[list] == resource_user) {
+    resource_quota->roots[list] = resource_user->links[list].next;
+    if (resource_quota->roots[list] == resource_user) {
+      resource_quota->roots[list] = nullptr;
+    }
+  }
+  resource_user->links[list].next->links[list].prev =
+      resource_user->links[list].prev;
+  resource_user->links[list].prev->links[list].next =
+      resource_user->links[list].next;
+  resource_user->links[list].next = resource_user->links[list].prev = nullptr;
+}
+
+/*******************************************************************************
+ * resource quota state machine
+ */
+
+static bool rq_alloc(grpc_resource_quota* resource_quota);
+static bool rq_reclaim_from_per_user_free_pool(
+    grpc_resource_quota* resource_quota);
+static bool rq_reclaim(grpc_resource_quota* resource_quota, bool destructive);
+
+static void rq_step(void* rq, grpc_error* error) {
+  grpc_resource_quota* resource_quota = static_cast<grpc_resource_quota*>(rq);
+  resource_quota->step_scheduled = false;
+  do {
+    if (rq_alloc(resource_quota)) goto done;
+  } while (rq_reclaim_from_per_user_free_pool(resource_quota));
+
+  if (!rq_reclaim(resource_quota, false)) {
+    rq_reclaim(resource_quota, true);
+  }
+
+done:
+  grpc_resource_quota_unref_internal(resource_quota);
+}
+
+static void rq_step_sched(grpc_resource_quota* resource_quota) {
+  if (resource_quota->step_scheduled) return;
+  resource_quota->step_scheduled = true;
+  grpc_resource_quota_ref_internal(resource_quota);
+  GRPC_CLOSURE_SCHED(&resource_quota->rq_step_closure, GRPC_ERROR_NONE);
+}
+
+/* update the atomically available resource estimate - use no barriers since
+   timeliness of delivery really doesn't matter much */
+static void rq_update_estimate(grpc_resource_quota* resource_quota) {
+  gpr_atm memory_usage_estimation = MEMORY_USAGE_ESTIMATION_MAX;
+  if (resource_quota->size != 0) {
+    memory_usage_estimation =
+        GPR_CLAMP((gpr_atm)((1.0 - ((double)resource_quota->free_pool) /
+                                       ((double)resource_quota->size)) *
+                            MEMORY_USAGE_ESTIMATION_MAX),
+                  0, MEMORY_USAGE_ESTIMATION_MAX);
+  }
+  gpr_atm_no_barrier_store(&resource_quota->memory_usage_estimation,
+                           memory_usage_estimation);
+}
+
+/* returns true if all allocations are completed */
+static bool rq_alloc(grpc_resource_quota* resource_quota) {
+  grpc_resource_user* resource_user;
+  while ((resource_user = rulist_pop_head(resource_quota,
+                                          GRPC_RULIST_AWAITING_ALLOCATION))) {
+    gpr_mu_lock(&resource_user->mu);
+    if (GRPC_TRACE_FLAG_ENABLED(grpc_resource_quota_trace)) {
+      gpr_log(GPR_INFO,
+              "RQ: check allocation for user %p shutdown=%" PRIdPTR
+              " free_pool=%" PRId64,
+              resource_user, gpr_atm_no_barrier_load(&resource_user->shutdown),
+              resource_user->free_pool);
+    }
+    if (gpr_atm_no_barrier_load(&resource_user->shutdown)) {
+      resource_user->allocating = false;
+      grpc_closure_list_fail_all(
+          &resource_user->on_allocated,
+          GRPC_ERROR_CREATE_FROM_STATIC_STRING("Resource user shutdown"));
+      int64_t aborted_allocations = resource_user->outstanding_allocations;
+      resource_user->outstanding_allocations = 0;
+      resource_user->free_pool += aborted_allocations;
+      GRPC_CLOSURE_LIST_SCHED(&resource_user->on_allocated);
+      gpr_mu_unlock(&resource_user->mu);
+      ru_unref_by(resource_user, static_cast<gpr_atm>(aborted_allocations));
+      continue;
+    }
+    if (resource_user->free_pool < 0 &&
+        -resource_user->free_pool <= resource_quota->free_pool) {
+      int64_t amt = -resource_user->free_pool;
+      resource_user->free_pool = 0;
+      resource_quota->free_pool -= amt;
+      rq_update_estimate(resource_quota);
+      if (GRPC_TRACE_FLAG_ENABLED(grpc_resource_quota_trace)) {
+        gpr_log(GPR_INFO,
+                "RQ %s %s: grant alloc %" PRId64
+                " bytes; rq_free_pool -> %" PRId64,
+                resource_quota->name, resource_user->name, amt,
+                resource_quota->free_pool);
+      }
+    } else if (GRPC_TRACE_FLAG_ENABLED(grpc_resource_quota_trace) &&
+               resource_user->free_pool >= 0) {
+      gpr_log(GPR_INFO, "RQ %s %s: discard already satisfied alloc request",
+              resource_quota->name, resource_user->name);
+    }
+    if (resource_user->free_pool >= 0) {
+      resource_user->allocating = false;
+      resource_user->outstanding_allocations = 0;
+      GRPC_CLOSURE_LIST_SCHED(&resource_user->on_allocated);
+      gpr_mu_unlock(&resource_user->mu);
+    } else {
+      rulist_add_head(resource_user, GRPC_RULIST_AWAITING_ALLOCATION);
+      gpr_mu_unlock(&resource_user->mu);
+      return false;
+    }
+  }
+  return true;
+}
+
+/* returns true if any memory could be reclaimed from buffers */
+static bool rq_reclaim_from_per_user_free_pool(
+    grpc_resource_quota* resource_quota) {
+  grpc_resource_user* resource_user;
+  while ((resource_user = rulist_pop_head(resource_quota,
+                                          GRPC_RULIST_NON_EMPTY_FREE_POOL))) {
+    gpr_mu_lock(&resource_user->mu);
+    resource_user->added_to_free_pool = false;
+    if (resource_user->free_pool > 0) {
+      int64_t amt = resource_user->free_pool;
+      resource_user->free_pool = 0;
+      resource_quota->free_pool += amt;
+      rq_update_estimate(resource_quota);
+      if (GRPC_TRACE_FLAG_ENABLED(grpc_resource_quota_trace)) {
+        gpr_log(GPR_INFO,
+                "RQ %s %s: reclaim_from_per_user_free_pool %" PRId64
+                " bytes; rq_free_pool -> %" PRId64,
+                resource_quota->name, resource_user->name, amt,
+                resource_quota->free_pool);
+      }
+      gpr_mu_unlock(&resource_user->mu);
+      return true;
+    } else {
+      if (GRPC_TRACE_FLAG_ENABLED(grpc_resource_quota_trace)) {
+        gpr_log(GPR_INFO,
+                "RQ %s %s: failed to reclaim_from_per_user_free_pool; "
+                "free_pool = %" PRId64 "; rq_free_pool = %" PRId64,
+                resource_quota->name, resource_user->name,
+                resource_user->free_pool, resource_quota->free_pool);
+      }
+      gpr_mu_unlock(&resource_user->mu);
+    }
+  }
+  return false;
+}
+
+/* returns true if reclamation is proceeding */
+static bool rq_reclaim(grpc_resource_quota* resource_quota, bool destructive) {
+  if (resource_quota->reclaiming) return true;
+  grpc_rulist list = destructive ? GRPC_RULIST_RECLAIMER_DESTRUCTIVE
+                                 : GRPC_RULIST_RECLAIMER_BENIGN;
+  grpc_resource_user* resource_user = rulist_pop_head(resource_quota, list);
+  if (resource_user == nullptr) return false;
+  if (GRPC_TRACE_FLAG_ENABLED(grpc_resource_quota_trace)) {
+    gpr_log(GPR_INFO, "RQ %s %s: initiate %s reclamation", resource_quota->name,
+            resource_user->name, destructive ? "destructive" : "benign");
+  }
+  resource_quota->reclaiming = true;
+  grpc_resource_quota_ref_internal(resource_quota);
+  grpc_closure* c = resource_user->reclaimers[destructive];
+  GPR_ASSERT(c);
+  resource_quota->debug_only_last_reclaimer_resource_user = resource_user;
+  resource_quota->debug_only_last_initiated_reclaimer = c;
+  resource_user->reclaimers[destructive] = nullptr;
+  GRPC_CLOSURE_SCHED(c, GRPC_ERROR_NONE);
+  return true;
+}
+
+/*******************************************************************************
+ * ru_slice: a slice implementation that is backed by a grpc_resource_user
+ */
+
+namespace grpc_core {
+
+class RuSliceRefcount {
+ public:
+  static void Destroy(void* p) {
+    auto* rc = static_cast<RuSliceRefcount*>(p);
+    rc->~RuSliceRefcount();
+    gpr_free(rc);
+  }
+  RuSliceRefcount(grpc_resource_user* resource_user, size_t size)
+      : base_(grpc_slice_refcount::Type::REGULAR, &refs_, Destroy, this,
+              &base_),
+        resource_user_(resource_user),
+        size_(size) {
+    // Nothing to do here.
+  }
+  ~RuSliceRefcount() { grpc_resource_user_free(resource_user_, size_); }
+
+  grpc_slice_refcount* base_refcount() { return &base_; }
+
+ private:
+  grpc_slice_refcount base_;
+  RefCount refs_;
+  grpc_resource_user* resource_user_;
+  size_t size_;
+};
+
+}  // namespace grpc_core
+
+static grpc_slice ru_slice_create(grpc_resource_user* resource_user,
+                                  size_t size) {
+  auto* rc = static_cast<grpc_core::RuSliceRefcount*>(
+      gpr_malloc(sizeof(grpc_core::RuSliceRefcount) + size));
+  new (rc) grpc_core::RuSliceRefcount(resource_user, size);
+  grpc_slice slice;
+
+  slice.refcount = rc->base_refcount();
+  slice.data.refcounted.bytes = reinterpret_cast<uint8_t*>(rc + 1);
+  slice.data.refcounted.length = size;
+  return slice;
+}
+
+/*******************************************************************************
+ * grpc_resource_quota internal implementation: resource user manipulation under
+ * the combiner
+ */
+
+static void ru_allocate(void* ru, grpc_error* error) {
+  grpc_resource_user* resource_user = static_cast<grpc_resource_user*>(ru);
+  if (rulist_empty(resource_user->resource_quota,
+                   GRPC_RULIST_AWAITING_ALLOCATION)) {
+    rq_step_sched(resource_user->resource_quota);
+  }
+  rulist_add_tail(resource_user, GRPC_RULIST_AWAITING_ALLOCATION);
+}
+
+static void ru_add_to_free_pool(void* ru, grpc_error* error) {
+  grpc_resource_user* resource_user = static_cast<grpc_resource_user*>(ru);
+  if (!rulist_empty(resource_user->resource_quota,
+                    GRPC_RULIST_AWAITING_ALLOCATION) &&
+      rulist_empty(resource_user->resource_quota,
+                   GRPC_RULIST_NON_EMPTY_FREE_POOL)) {
+    rq_step_sched(resource_user->resource_quota);
+  }
+  rulist_add_tail(resource_user, GRPC_RULIST_NON_EMPTY_FREE_POOL);
+}
+
+static bool ru_post_reclaimer(grpc_resource_user* resource_user,
+                              bool destructive) {
+  grpc_closure* closure = resource_user->new_reclaimers[destructive];
+  GPR_ASSERT(closure != nullptr);
+  resource_user->new_reclaimers[destructive] = nullptr;
+  GPR_ASSERT(resource_user->reclaimers[destructive] == nullptr);
+  if (gpr_atm_acq_load(&resource_user->shutdown) > 0) {
+    GRPC_CLOSURE_SCHED(closure, GRPC_ERROR_CANCELLED);
+    return false;
+  }
+  resource_user->reclaimers[destructive] = closure;
+  return true;
+}
+
+static void ru_post_benign_reclaimer(void* ru, grpc_error* error) {
+  grpc_resource_user* resource_user = static_cast<grpc_resource_user*>(ru);
+  if (!ru_post_reclaimer(resource_user, false)) return;
+  if (!rulist_empty(resource_user->resource_quota,
+                    GRPC_RULIST_AWAITING_ALLOCATION) &&
+      rulist_empty(resource_user->resource_quota,
+                   GRPC_RULIST_NON_EMPTY_FREE_POOL) &&
+      rulist_empty(resource_user->resource_quota,
+                   GRPC_RULIST_RECLAIMER_BENIGN)) {
+    rq_step_sched(resource_user->resource_quota);
+  }
+  rulist_add_tail(resource_user, GRPC_RULIST_RECLAIMER_BENIGN);
+}
+
+static void ru_post_destructive_reclaimer(void* ru, grpc_error* error) {
+  grpc_resource_user* resource_user = static_cast<grpc_resource_user*>(ru);
+  if (!ru_post_reclaimer(resource_user, true)) return;
+  if (!rulist_empty(resource_user->resource_quota,
+                    GRPC_RULIST_AWAITING_ALLOCATION) &&
+      rulist_empty(resource_user->resource_quota,
+                   GRPC_RULIST_NON_EMPTY_FREE_POOL) &&
+      rulist_empty(resource_user->resource_quota,
+                   GRPC_RULIST_RECLAIMER_BENIGN) &&
+      rulist_empty(resource_user->resource_quota,
+                   GRPC_RULIST_RECLAIMER_DESTRUCTIVE)) {
+    rq_step_sched(resource_user->resource_quota);
+  }
+  rulist_add_tail(resource_user, GRPC_RULIST_RECLAIMER_DESTRUCTIVE);
+}
+
+static void ru_shutdown(void* ru, grpc_error* error) {
+  if (GRPC_TRACE_FLAG_ENABLED(grpc_resource_quota_trace)) {
+    gpr_log(GPR_INFO, "RU shutdown %p", ru);
+  }
+  grpc_resource_user* resource_user = static_cast<grpc_resource_user*>(ru);
+  gpr_mu_lock(&resource_user->mu);
+  GRPC_CLOSURE_SCHED(resource_user->reclaimers[0], GRPC_ERROR_CANCELLED);
+  GRPC_CLOSURE_SCHED(resource_user->reclaimers[1], GRPC_ERROR_CANCELLED);
+  resource_user->reclaimers[0] = nullptr;
+  resource_user->reclaimers[1] = nullptr;
+  rulist_remove(resource_user, GRPC_RULIST_RECLAIMER_BENIGN);
+  rulist_remove(resource_user, GRPC_RULIST_RECLAIMER_DESTRUCTIVE);
+  if (resource_user->allocating) {
+    rq_step_sched(resource_user->resource_quota);
+  }
+  gpr_mu_unlock(&resource_user->mu);
+}
+
+static void ru_destroy(void* ru, grpc_error* error) {
+  grpc_resource_user* resource_user = static_cast<grpc_resource_user*>(ru);
+  GPR_ASSERT(gpr_atm_no_barrier_load(&resource_user->refs) == 0);
+  // Free all the remaining thread quota
+  grpc_resource_user_free_threads(resource_user,
+                                  static_cast<int>(gpr_atm_no_barrier_load(
+                                      &resource_user->num_threads_allocated)));
+
+  for (int i = 0; i < GRPC_RULIST_COUNT; i++) {
+    rulist_remove(resource_user, static_cast<grpc_rulist>(i));
+  }
+  GRPC_CLOSURE_SCHED(resource_user->reclaimers[0], GRPC_ERROR_CANCELLED);
+  GRPC_CLOSURE_SCHED(resource_user->reclaimers[1], GRPC_ERROR_CANCELLED);
+  if (resource_user->free_pool != 0) {
+    resource_user->resource_quota->free_pool += resource_user->free_pool;
+    rq_step_sched(resource_user->resource_quota);
+  }
+  grpc_resource_quota_unref_internal(resource_user->resource_quota);
+  gpr_mu_destroy(&resource_user->mu);
+  gpr_free(resource_user->name);
+  gpr_free(resource_user);
+}
+
+static void ru_allocated_slices(void* arg, grpc_error* error) {
+  grpc_resource_user_slice_allocator* slice_allocator =
+      static_cast<grpc_resource_user_slice_allocator*>(arg);
+  if (error == GRPC_ERROR_NONE) {
+    for (size_t i = 0; i < slice_allocator->count; i++) {
+      grpc_slice_buffer_add_indexed(
+          slice_allocator->dest, ru_slice_create(slice_allocator->resource_user,
+                                                 slice_allocator->length));
+    }
+  }
+  GRPC_CLOSURE_RUN(&slice_allocator->on_done, GRPC_ERROR_REF(error));
+}
+
+/*******************************************************************************
+ * grpc_resource_quota internal implementation: quota manipulation under the
+ * combiner
+ */
+
+typedef struct {
+  int64_t size;
+  grpc_resource_quota* resource_quota;
+  grpc_closure closure;
+} rq_resize_args;
+
+static void rq_resize(void* args, grpc_error* error) {
+  rq_resize_args* a = static_cast<rq_resize_args*>(args);
+  int64_t delta = a->size - a->resource_quota->size;
+  a->resource_quota->size += delta;
+  a->resource_quota->free_pool += delta;
+  rq_update_estimate(a->resource_quota);
+  rq_step_sched(a->resource_quota);
+  grpc_resource_quota_unref_internal(a->resource_quota);
+  gpr_free(a);
+}
+
+static void rq_reclamation_done(void* rq, grpc_error* error) {
+  grpc_resource_quota* resource_quota = static_cast<grpc_resource_quota*>(rq);
+  resource_quota->reclaiming = false;
+  rq_step_sched(resource_quota);
+  grpc_resource_quota_unref_internal(resource_quota);
+}
+
+/*******************************************************************************
+ * grpc_resource_quota api
+ */
+
+/* Public API */
+grpc_resource_quota* grpc_resource_quota_create(const char* name) {
+  grpc_resource_quota* resource_quota =
+      static_cast<grpc_resource_quota*>(gpr_malloc(sizeof(*resource_quota)));
+  gpr_ref_init(&resource_quota->refs, 1);
+  resource_quota->combiner = grpc_combiner_create();
+  resource_quota->free_pool = INT64_MAX;
+  resource_quota->size = INT64_MAX;
+  resource_quota->used = 0;
+  gpr_atm_no_barrier_store(&resource_quota->last_size, GPR_ATM_MAX);
+  gpr_mu_init(&resource_quota->thread_count_mu);
+  resource_quota->max_threads = INT_MAX;
+  resource_quota->num_threads_allocated = 0;
+  resource_quota->step_scheduled = false;
+  resource_quota->reclaiming = false;
+  gpr_atm_no_barrier_store(&resource_quota->memory_usage_estimation, 0);
+  if (name != nullptr) {
+    resource_quota->name = gpr_strdup(name);
+  } else {
+    gpr_asprintf(&resource_quota->name, "anonymous_pool_%" PRIxPTR,
+                 (intptr_t)resource_quota);
+  }
+  GRPC_CLOSURE_INIT(&resource_quota->rq_step_closure, rq_step, resource_quota,
+                    grpc_combiner_finally_scheduler(resource_quota->combiner));
+  GRPC_CLOSURE_INIT(&resource_quota->rq_reclamation_done_closure,
+                    rq_reclamation_done, resource_quota,
+                    grpc_combiner_scheduler(resource_quota->combiner));
+  for (int i = 0; i < GRPC_RULIST_COUNT; i++) {
+    resource_quota->roots[i] = nullptr;
+  }
+  return resource_quota;
+}
+
+void grpc_resource_quota_unref_internal(grpc_resource_quota* resource_quota) {
+  if (gpr_unref(&resource_quota->refs)) {
+    // No outstanding thread quota
+    GPR_ASSERT(resource_quota->num_threads_allocated == 0);
+    GRPC_COMBINER_UNREF(resource_quota->combiner, "resource_quota");
+    gpr_free(resource_quota->name);
+    gpr_mu_destroy(&resource_quota->thread_count_mu);
+    gpr_free(resource_quota);
+  }
+}
+
+/* Public API */
+void grpc_resource_quota_unref(grpc_resource_quota* resource_quota) {
+  grpc_core::ExecCtx exec_ctx;
+  grpc_resource_quota_unref_internal(resource_quota);
+}
+
+grpc_resource_quota* grpc_resource_quota_ref_internal(
+    grpc_resource_quota* resource_quota) {
+  gpr_ref(&resource_quota->refs);
+  return resource_quota;
+}
+
+/* Public API */
+void grpc_resource_quota_ref(grpc_resource_quota* resource_quota) {
+  grpc_resource_quota_ref_internal(resource_quota);
+}
+
+double grpc_resource_quota_get_memory_pressure(
+    grpc_resource_quota* resource_quota) {
+  return (static_cast<double>(gpr_atm_no_barrier_load(
+             &resource_quota->memory_usage_estimation))) /
+         (static_cast<double>(MEMORY_USAGE_ESTIMATION_MAX));
+}
+
+/* Public API */
+void grpc_resource_quota_set_max_threads(grpc_resource_quota* resource_quota,
+                                         int new_max_threads) {
+  GPR_ASSERT(new_max_threads >= 0);
+  gpr_mu_lock(&resource_quota->thread_count_mu);
+  resource_quota->max_threads = new_max_threads;
+  gpr_mu_unlock(&resource_quota->thread_count_mu);
+}
+
+/* Public API */
+void grpc_resource_quota_resize(grpc_resource_quota* resource_quota,
+                                size_t size) {
+  grpc_core::ExecCtx exec_ctx;
+  rq_resize_args* a = static_cast<rq_resize_args*>(gpr_malloc(sizeof(*a)));
+  a->resource_quota = grpc_resource_quota_ref_internal(resource_quota);
+  a->size = static_cast<int64_t>(size);
+  gpr_atm_no_barrier_store(&resource_quota->last_size,
+                           (gpr_atm)GPR_MIN((size_t)GPR_ATM_MAX, size));
+  GRPC_CLOSURE_INIT(&a->closure, rq_resize, a, grpc_schedule_on_exec_ctx);
+  GRPC_CLOSURE_SCHED(&a->closure, GRPC_ERROR_NONE);
+}
+
+size_t grpc_resource_quota_peek_size(grpc_resource_quota* resource_quota) {
+  return static_cast<size_t>(
+      gpr_atm_no_barrier_load(&resource_quota->last_size));
+}
+
+/*******************************************************************************
+ * grpc_resource_user channel args api
+ */
+
+grpc_resource_quota* grpc_resource_quota_from_channel_args(
+    const grpc_channel_args* channel_args, bool create) {
+  for (size_t i = 0; i < channel_args->num_args; i++) {
+    if (0 == strcmp(channel_args->args[i].key, GRPC_ARG_RESOURCE_QUOTA)) {
+      if (channel_args->args[i].type == GRPC_ARG_POINTER) {
+        return grpc_resource_quota_ref_internal(
+            static_cast<grpc_resource_quota*>(
+                channel_args->args[i].value.pointer.p));
+      } else {
+        gpr_log(GPR_DEBUG, GRPC_ARG_RESOURCE_QUOTA " should be a pointer");
+      }
+    }
+  }
+  return create ? grpc_resource_quota_create(nullptr) : nullptr;
+}
+
+static void* rq_copy(void* rq) {
+  grpc_resource_quota_ref(static_cast<grpc_resource_quota*>(rq));
+  return rq;
+}
+
+static void rq_destroy(void* rq) {
+  grpc_resource_quota_unref_internal(static_cast<grpc_resource_quota*>(rq));
+}
+
+static int rq_cmp(void* a, void* b) { return GPR_ICMP(a, b); }
+
+const grpc_arg_pointer_vtable* grpc_resource_quota_arg_vtable(void) {
+  static const grpc_arg_pointer_vtable vtable = {rq_copy, rq_destroy, rq_cmp};
+  return &vtable;
+}
+
+/*******************************************************************************
+ * grpc_resource_user api
+ */
+
+grpc_resource_user* grpc_resource_user_create(
+    grpc_resource_quota* resource_quota, const char* name) {
+  grpc_resource_user* resource_user =
+      static_cast<grpc_resource_user*>(gpr_malloc(sizeof(*resource_user)));
+  resource_user->resource_quota =
+      grpc_resource_quota_ref_internal(resource_quota);
+  GRPC_CLOSURE_INIT(&resource_user->allocate_closure, &ru_allocate,
+                    resource_user,
+                    grpc_combiner_scheduler(resource_quota->combiner));
+  GRPC_CLOSURE_INIT(&resource_user->add_to_free_pool_closure,
+                    &ru_add_to_free_pool, resource_user,
+                    grpc_combiner_scheduler(resource_quota->combiner));
+  GRPC_CLOSURE_INIT(&resource_user->post_reclaimer_closure[0],
+                    &ru_post_benign_reclaimer, resource_user,
+                    grpc_combiner_scheduler(resource_quota->combiner));
+  GRPC_CLOSURE_INIT(&resource_user->post_reclaimer_closure[1],
+                    &ru_post_destructive_reclaimer, resource_user,
+                    grpc_combiner_scheduler(resource_quota->combiner));
+  GRPC_CLOSURE_INIT(&resource_user->destroy_closure, &ru_destroy, resource_user,
+                    grpc_combiner_scheduler(resource_quota->combiner));
+  gpr_mu_init(&resource_user->mu);
+  gpr_atm_rel_store(&resource_user->refs, 1);
+  gpr_atm_rel_store(&resource_user->shutdown, 0);
+  resource_user->free_pool = 0;
+  grpc_closure_list_init(&resource_user->on_allocated);
+  resource_user->allocating = false;
+  resource_user->added_to_free_pool = false;
+  gpr_atm_no_barrier_store(&resource_user->num_threads_allocated, 0);
+  resource_user->reclaimers[0] = nullptr;
+  resource_user->reclaimers[1] = nullptr;
+  resource_user->new_reclaimers[0] = nullptr;
+  resource_user->new_reclaimers[1] = nullptr;
+  resource_user->outstanding_allocations = 0;
+  for (int i = 0; i < GRPC_RULIST_COUNT; i++) {
+    resource_user->links[i].next = resource_user->links[i].prev = nullptr;
+  }
+  if (name != nullptr) {
+    resource_user->name = gpr_strdup(name);
+  } else {
+    gpr_asprintf(&resource_user->name, "anonymous_resource_user_%" PRIxPTR,
+                 (intptr_t)resource_user);
+  }
+  return resource_user;
+}
+
+grpc_resource_quota* grpc_resource_user_quota(
+    grpc_resource_user* resource_user) {
+  return resource_user->resource_quota;
+}
+
+static void ru_ref_by(grpc_resource_user* resource_user, gpr_atm amount) {
+  GPR_ASSERT(amount > 0);
+  GPR_ASSERT(gpr_atm_no_barrier_fetch_add(&resource_user->refs, amount) != 0);
+}
+
+static void ru_unref_by(grpc_resource_user* resource_user, gpr_atm amount) {
+  GPR_ASSERT(amount > 0);
+  gpr_atm old = gpr_atm_full_fetch_add(&resource_user->refs, -amount);
+  GPR_ASSERT(old >= amount);
+  if (old == amount) {
+    GRPC_CLOSURE_SCHED(&resource_user->destroy_closure, GRPC_ERROR_NONE);
+  }
+}
+
+void grpc_resource_user_ref(grpc_resource_user* resource_user) {
+  ru_ref_by(resource_user, 1);
+}
+
+void grpc_resource_user_unref(grpc_resource_user* resource_user) {
+  ru_unref_by(resource_user, 1);
+}
+
+void grpc_resource_user_shutdown(grpc_resource_user* resource_user) {
+  if (gpr_atm_full_fetch_add(&resource_user->shutdown, 1) == 0) {
+    GRPC_CLOSURE_SCHED(
+        GRPC_CLOSURE_CREATE(
+            ru_shutdown, resource_user,
+            grpc_combiner_scheduler(resource_user->resource_quota->combiner)),
+        GRPC_ERROR_NONE);
+  }
+}
+
+bool grpc_resource_user_allocate_threads(grpc_resource_user* resource_user,
+                                         int thread_count) {
+  GPR_ASSERT(thread_count >= 0);
+  bool is_success = false;
+  gpr_mu_lock(&resource_user->resource_quota->thread_count_mu);
+  grpc_resource_quota* rq = resource_user->resource_quota;
+  if (rq->num_threads_allocated + thread_count <= rq->max_threads) {
+    rq->num_threads_allocated += thread_count;
+    gpr_atm_no_barrier_fetch_add(&resource_user->num_threads_allocated,
+                                 thread_count);
+    is_success = true;
+  }
+  gpr_mu_unlock(&resource_user->resource_quota->thread_count_mu);
+  return is_success;
+}
+
+void grpc_resource_user_free_threads(grpc_resource_user* resource_user,
+                                     int thread_count) {
+  GPR_ASSERT(thread_count >= 0);
+  gpr_mu_lock(&resource_user->resource_quota->thread_count_mu);
+  grpc_resource_quota* rq = resource_user->resource_quota;
+  rq->num_threads_allocated -= thread_count;
+  int old_count = static_cast<int>(gpr_atm_no_barrier_fetch_add(
+      &resource_user->num_threads_allocated, -thread_count));
+  if (old_count < thread_count || rq->num_threads_allocated < 0) {
+    gpr_log(GPR_ERROR,
+            "Releasing more threads (%d) than currently allocated (rq threads: "
+            "%d, ru threads: %d)",
+            thread_count, rq->num_threads_allocated + thread_count, old_count);
+    abort();
+  }
+  gpr_mu_unlock(&resource_user->resource_quota->thread_count_mu);
+}
+
+static void resource_user_alloc_locked(grpc_resource_user* resource_user,
+                                       size_t size,
+                                       grpc_closure* optional_on_done) {
+  ru_ref_by(resource_user, static_cast<gpr_atm>(size));
+  resource_user->free_pool -= static_cast<int64_t>(size);
+  if (GRPC_TRACE_FLAG_ENABLED(grpc_resource_quota_trace)) {
+    gpr_log(GPR_INFO, "RQ %s %s: alloc %" PRIdPTR "; free_pool -> %" PRId64,
+            resource_user->resource_quota->name, resource_user->name, size,
+            resource_user->free_pool);
+  }
+  if (resource_user->free_pool < 0) {
+    if (optional_on_done != nullptr) {
+      resource_user->outstanding_allocations += static_cast<int64_t>(size);
+      grpc_closure_list_append(&resource_user->on_allocated, optional_on_done,
+                               GRPC_ERROR_NONE);
+    }
+    if (!resource_user->allocating) {
+      resource_user->allocating = true;
+      GRPC_CLOSURE_SCHED(&resource_user->allocate_closure, GRPC_ERROR_NONE);
+    }
+  } else {
+    GRPC_CLOSURE_SCHED(optional_on_done, GRPC_ERROR_NONE);
+  }
+}
+
+bool grpc_resource_user_safe_alloc(grpc_resource_user* resource_user,
+                                   size_t size) {
+  if (gpr_atm_no_barrier_load(&resource_user->shutdown)) return false;
+  gpr_mu_lock(&resource_user->mu);
+  grpc_resource_quota* resource_quota = resource_user->resource_quota;
+  bool cas_success;
+  do {
+    gpr_atm used = gpr_atm_no_barrier_load(&resource_quota->used);
+    gpr_atm new_used = used + size;
+    if (static_cast<size_t>(new_used) >
+        grpc_resource_quota_peek_size(resource_quota)) {
+      gpr_mu_unlock(&resource_user->mu);
+      return false;
+    }
+    cas_success = gpr_atm_full_cas(&resource_quota->used, used, new_used);
+  } while (!cas_success);
+  resource_user_alloc_locked(resource_user, size, nullptr);
+  gpr_mu_unlock(&resource_user->mu);
+  return true;
+}
+
+void grpc_resource_user_alloc(grpc_resource_user* resource_user, size_t size,
+                              grpc_closure* optional_on_done) {
+  // TODO(juanlishen): Maybe return immediately if shutting down. Deferring this
+  // because some tests become flaky after the change.
+  gpr_mu_lock(&resource_user->mu);
+  grpc_resource_quota* resource_quota = resource_user->resource_quota;
+  gpr_atm_no_barrier_fetch_add(&resource_quota->used, size);
+  resource_user_alloc_locked(resource_user, size, optional_on_done);
+  gpr_mu_unlock(&resource_user->mu);
+}
+
+void grpc_resource_user_free(grpc_resource_user* resource_user, size_t size) {
+  gpr_mu_lock(&resource_user->mu);
+  grpc_resource_quota* resource_quota = resource_user->resource_quota;
+  gpr_atm prior = gpr_atm_no_barrier_fetch_add(&resource_quota->used, -size);
+  GPR_ASSERT(prior >= static_cast<long>(size));
+  bool was_zero_or_negative = resource_user->free_pool <= 0;
+  resource_user->free_pool += static_cast<int64_t>(size);
+  if (GRPC_TRACE_FLAG_ENABLED(grpc_resource_quota_trace)) {
+    gpr_log(GPR_INFO, "RQ %s %s: free %" PRIdPTR "; free_pool -> %" PRId64,
+            resource_user->resource_quota->name, resource_user->name, size,
+            resource_user->free_pool);
+  }
+  bool is_bigger_than_zero = resource_user->free_pool > 0;
+  if (is_bigger_than_zero && was_zero_or_negative &&
+      !resource_user->added_to_free_pool) {
+    resource_user->added_to_free_pool = true;
+    GRPC_CLOSURE_SCHED(&resource_user->add_to_free_pool_closure,
+                       GRPC_ERROR_NONE);
+  }
+  gpr_mu_unlock(&resource_user->mu);
+  ru_unref_by(resource_user, static_cast<gpr_atm>(size));
+}
+
+void grpc_resource_user_post_reclaimer(grpc_resource_user* resource_user,
+                                       bool destructive,
+                                       grpc_closure* closure) {
+  GPR_ASSERT(resource_user->new_reclaimers[destructive] == nullptr);
+  resource_user->new_reclaimers[destructive] = closure;
+  GRPC_CLOSURE_SCHED(&resource_user->post_reclaimer_closure[destructive],
+                     GRPC_ERROR_NONE);
+}
+
+void grpc_resource_user_finish_reclamation(grpc_resource_user* resource_user) {
+  if (GRPC_TRACE_FLAG_ENABLED(grpc_resource_quota_trace)) {
+    gpr_log(GPR_INFO, "RQ %s %s: reclamation complete",
+            resource_user->resource_quota->name, resource_user->name);
+  }
+  GRPC_CLOSURE_SCHED(
+      &resource_user->resource_quota->rq_reclamation_done_closure,
+      GRPC_ERROR_NONE);
+}
+
+void grpc_resource_user_slice_allocator_init(
+    grpc_resource_user_slice_allocator* slice_allocator,
+    grpc_resource_user* resource_user, grpc_iomgr_cb_func cb, void* p) {
+  GRPC_CLOSURE_INIT(&slice_allocator->on_allocated, ru_allocated_slices,
+                    slice_allocator, grpc_schedule_on_exec_ctx);
+  GRPC_CLOSURE_INIT(&slice_allocator->on_done, cb, p,
+                    grpc_schedule_on_exec_ctx);
+  slice_allocator->resource_user = resource_user;
+}
+
+void grpc_resource_user_alloc_slices(
+    grpc_resource_user_slice_allocator* slice_allocator, size_t length,
+    size_t count, grpc_slice_buffer* dest) {
+  if (gpr_atm_no_barrier_load(&slice_allocator->resource_user->shutdown)) {
+    GRPC_CLOSURE_SCHED(
+        &slice_allocator->on_allocated,
+        GRPC_ERROR_CREATE_FROM_STATIC_STRING("Resource user shutdown"));
+    return;
+  }
+  slice_allocator->length = length;
+  slice_allocator->count = count;
+  slice_allocator->dest = dest;
+  grpc_resource_user_alloc(slice_allocator->resource_user, count * length,
+                           &slice_allocator->on_allocated);
+}