Built motion from commit 6a09e18b.|2.6.11
[motion2.git] / legacy-libs / grpc-cloned / deps / grpc / src / core / lib / iomgr / resource_quota.cc
1 /*
2  *
3  * Copyright 2016 gRPC authors.
4  *
5  * Licensed under the Apache License, Version 2.0 (the "License");
6  * you may not use this file except in compliance with the License.
7  * You may obtain a copy of the License at
8  *
9  *     http://www.apache.org/licenses/LICENSE-2.0
10  *
11  * Unless required by applicable law or agreed to in writing, software
12  * distributed under the License is distributed on an "AS IS" BASIS,
13  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14  * See the License for the specific language governing permissions and
15  * limitations under the License.
16  *
17  */
18
19 #include <grpc/support/port_platform.h>
20
21 #include "src/core/lib/iomgr/resource_quota.h"
22
23 #include <inttypes.h>
24 #include <limits.h>
25 #include <stdint.h>
26 #include <string.h>
27
28 #include <grpc/slice_buffer.h>
29 #include <grpc/support/alloc.h>
30 #include <grpc/support/log.h>
31 #include <grpc/support/string_util.h>
32
33 #include "src/core/lib/gpr/useful.h"
34 #include "src/core/lib/iomgr/combiner.h"
35 #include "src/core/lib/slice/slice_internal.h"
36
37 grpc_core::TraceFlag grpc_resource_quota_trace(false, "resource_quota");
38
39 #define MEMORY_USAGE_ESTIMATION_MAX 65536
40
41 /* Internal linked list pointers for a resource user */
42 typedef struct {
43   grpc_resource_user* next;
44   grpc_resource_user* prev;
45 } grpc_resource_user_link;
46
47 /* Resource users are kept in (potentially) several intrusive linked lists
48    at once. These are the list names. */
49 typedef enum {
50   /* Resource users that are waiting for an allocation */
51   GRPC_RULIST_AWAITING_ALLOCATION,
52   /* Resource users that have free memory available for internal reclamation */
53   GRPC_RULIST_NON_EMPTY_FREE_POOL,
54   /* Resource users that have published a benign reclamation is available */
55   GRPC_RULIST_RECLAIMER_BENIGN,
56   /* Resource users that have published a destructive reclamation is
57      available */
58   GRPC_RULIST_RECLAIMER_DESTRUCTIVE,
59   /* Number of lists: must be last */
60   GRPC_RULIST_COUNT
61 } grpc_rulist;
62
63 struct grpc_resource_user {
64   /* The quota this resource user consumes from */
65   grpc_resource_quota* resource_quota;
66
67   /* Closure to schedule an allocation under the resource quota combiner lock */
68   grpc_closure allocate_closure;
69   /* Closure to publish a non empty free pool under the resource quota combiner
70      lock */
71   grpc_closure add_to_free_pool_closure;
72
73   /* one ref for each ref call (released by grpc_resource_user_unref), and one
74      ref for each byte allocated (released by grpc_resource_user_free) */
75   gpr_atm refs;
76   /* is this resource user unlocked? starts at 0, increases for each shutdown
77      call */
78   gpr_atm shutdown;
79
80   gpr_mu mu;
81   /* The amount of memory (in bytes) this user has cached for its own use: to
82      avoid quota contention, each resource user can keep some memory in
83      addition to what it is immediately using (e.g., for caching), and the quota
84      can pull it back under memory pressure.
85      This value can become negative if more memory has been requested than
86      existed in the free pool, at which point the quota is consulted to bring
87      this value non-negative (asynchronously). */
88   int64_t free_pool;
89   /* A list of closures to call once free_pool becomes non-negative - ie when
90      all outstanding allocations have been granted. */
91   grpc_closure_list on_allocated;
92   /* True if we are currently trying to allocate from the quota, false if not */
93   bool allocating;
94   /* The amount of memory (in bytes) that has been requested from this user
95    * asynchronously but hasn't been granted yet. */
96   int64_t outstanding_allocations;
97   /* True if we are currently trying to add ourselves to the non-free quota
98      list, false otherwise */
99   bool added_to_free_pool;
100
101   /* The number of threads currently allocated to this resource user */
102   gpr_atm num_threads_allocated;
103
104   /* Reclaimers: index 0 is the benign reclaimer, 1 is the destructive reclaimer
105    */
106   grpc_closure* reclaimers[2];
107   /* Reclaimers just posted: once we're in the combiner lock, we'll move them
108      to the array above */
109   grpc_closure* new_reclaimers[2];
110   /* Trampoline closures to finish reclamation and re-enter the quota combiner
111      lock */
112   grpc_closure post_reclaimer_closure[2];
113
114   /* Closure to execute under the quota combiner to de-register and shutdown the
115      resource user */
116   grpc_closure destroy_closure;
117
118   /* Links in the various grpc_rulist lists */
119   grpc_resource_user_link links[GRPC_RULIST_COUNT];
120
121   /* The name of this resource user, for debugging/tracing */
122   char* name;
123 };
124
125 struct grpc_resource_quota {
126   /* refcount */
127   gpr_refcount refs;
128
129   /* estimate of current memory usage
130      scaled to the range [0..RESOURCE_USAGE_ESTIMATION_MAX] */
131   gpr_atm memory_usage_estimation;
132
133   /* Master combiner lock: all activity on a quota executes under this combiner
134    * (so no mutex is needed for this data structure) */
135   grpc_combiner* combiner;
136   /* Size of the resource quota */
137   int64_t size;
138   /* Amount of free memory in the resource quota */
139   int64_t free_pool;
140   /* Used size of memory in the resource quota. Updated as soon as the resource
141    * users start to allocate or free the memory. */
142   gpr_atm used;
143
144   gpr_atm last_size;
145
146   /* Mutex to protect max_threads and num_threads_allocated */
147   /* Note: We could have used gpr_atm for max_threads and num_threads_allocated
148    * and avoid having this mutex; but in that case, each invocation of the
149    * function grpc_resource_user_allocate_threads() would have had to do at
150    * least two atomic loads (for max_threads and num_threads_allocated) followed
151    * by a CAS (on num_threads_allocated).
152    * Moreover, we expect grpc_resource_user_allocate_threads() to be often
153    * called concurrently thereby increasing the chances of failing the CAS
154    * operation. This additional complexity is not worth the tiny perf gain we
155    * may (or may not) have by using atomics */
156   gpr_mu thread_count_mu;
157
158   /* Max number of threads allowed */
159   int max_threads;
160
161   /* Number of threads currently allocated via this resource_quota object */
162   int num_threads_allocated;
163
164   /* Has rq_step been scheduled to occur? */
165   bool step_scheduled;
166
167   /* Are we currently reclaiming memory */
168   bool reclaiming;
169
170   /* Closure around rq_step */
171   grpc_closure rq_step_closure;
172
173   /* Closure around rq_reclamation_done */
174   grpc_closure rq_reclamation_done_closure;
175
176   /* This is only really usable for debugging: it's always a stale pointer, but
177      a stale pointer that might just be fresh enough to guide us to where the
178      reclamation system is stuck */
179   grpc_closure* debug_only_last_initiated_reclaimer;
180   grpc_resource_user* debug_only_last_reclaimer_resource_user;
181
182   /* Roots of all resource user lists */
183   grpc_resource_user* roots[GRPC_RULIST_COUNT];
184
185   char* name;
186 };
187
188 static void ru_unref_by(grpc_resource_user* resource_user, gpr_atm amount);
189
190 /*******************************************************************************
191  * list management
192  */
193
194 static void rulist_add_head(grpc_resource_user* resource_user,
195                             grpc_rulist list) {
196   grpc_resource_quota* resource_quota = resource_user->resource_quota;
197   grpc_resource_user** root = &resource_quota->roots[list];
198   if (*root == nullptr) {
199     *root = resource_user;
200     resource_user->links[list].next = resource_user->links[list].prev =
201         resource_user;
202   } else {
203     resource_user->links[list].next = *root;
204     resource_user->links[list].prev = (*root)->links[list].prev;
205     resource_user->links[list].next->links[list].prev =
206         resource_user->links[list].prev->links[list].next = resource_user;
207     *root = resource_user;
208   }
209 }
210
211 static void rulist_add_tail(grpc_resource_user* resource_user,
212                             grpc_rulist list) {
213   grpc_resource_quota* resource_quota = resource_user->resource_quota;
214   grpc_resource_user** root = &resource_quota->roots[list];
215   if (*root == nullptr) {
216     *root = resource_user;
217     resource_user->links[list].next = resource_user->links[list].prev =
218         resource_user;
219   } else {
220     resource_user->links[list].next = (*root)->links[list].next;
221     resource_user->links[list].prev = *root;
222     resource_user->links[list].next->links[list].prev =
223         resource_user->links[list].prev->links[list].next = resource_user;
224   }
225 }
226
227 static bool rulist_empty(grpc_resource_quota* resource_quota,
228                          grpc_rulist list) {
229   return resource_quota->roots[list] == nullptr;
230 }
231
232 static grpc_resource_user* rulist_pop_head(grpc_resource_quota* resource_quota,
233                                            grpc_rulist list) {
234   grpc_resource_user** root = &resource_quota->roots[list];
235   grpc_resource_user* resource_user = *root;
236   if (resource_user == nullptr) {
237     return nullptr;
238   }
239   if (resource_user->links[list].next == resource_user) {
240     *root = nullptr;
241   } else {
242     resource_user->links[list].next->links[list].prev =
243         resource_user->links[list].prev;
244     resource_user->links[list].prev->links[list].next =
245         resource_user->links[list].next;
246     *root = resource_user->links[list].next;
247   }
248   resource_user->links[list].next = resource_user->links[list].prev = nullptr;
249   return resource_user;
250 }
251
252 static void rulist_remove(grpc_resource_user* resource_user, grpc_rulist list) {
253   if (resource_user->links[list].next == nullptr) return;
254   grpc_resource_quota* resource_quota = resource_user->resource_quota;
255   if (resource_quota->roots[list] == resource_user) {
256     resource_quota->roots[list] = resource_user->links[list].next;
257     if (resource_quota->roots[list] == resource_user) {
258       resource_quota->roots[list] = nullptr;
259     }
260   }
261   resource_user->links[list].next->links[list].prev =
262       resource_user->links[list].prev;
263   resource_user->links[list].prev->links[list].next =
264       resource_user->links[list].next;
265   resource_user->links[list].next = resource_user->links[list].prev = nullptr;
266 }
267
268 /*******************************************************************************
269  * resource quota state machine
270  */
271
272 static bool rq_alloc(grpc_resource_quota* resource_quota);
273 static bool rq_reclaim_from_per_user_free_pool(
274     grpc_resource_quota* resource_quota);
275 static bool rq_reclaim(grpc_resource_quota* resource_quota, bool destructive);
276
277 static void rq_step(void* rq, grpc_error* error) {
278   grpc_resource_quota* resource_quota = static_cast<grpc_resource_quota*>(rq);
279   resource_quota->step_scheduled = false;
280   do {
281     if (rq_alloc(resource_quota)) goto done;
282   } while (rq_reclaim_from_per_user_free_pool(resource_quota));
283
284   if (!rq_reclaim(resource_quota, false)) {
285     rq_reclaim(resource_quota, true);
286   }
287
288 done:
289   grpc_resource_quota_unref_internal(resource_quota);
290 }
291
292 static void rq_step_sched(grpc_resource_quota* resource_quota) {
293   if (resource_quota->step_scheduled) return;
294   resource_quota->step_scheduled = true;
295   grpc_resource_quota_ref_internal(resource_quota);
296   GRPC_CLOSURE_SCHED(&resource_quota->rq_step_closure, GRPC_ERROR_NONE);
297 }
298
299 /* update the atomically available resource estimate - use no barriers since
300    timeliness of delivery really doesn't matter much */
301 static void rq_update_estimate(grpc_resource_quota* resource_quota) {
302   gpr_atm memory_usage_estimation = MEMORY_USAGE_ESTIMATION_MAX;
303   if (resource_quota->size != 0) {
304     memory_usage_estimation =
305         GPR_CLAMP((gpr_atm)((1.0 - ((double)resource_quota->free_pool) /
306                                        ((double)resource_quota->size)) *
307                             MEMORY_USAGE_ESTIMATION_MAX),
308                   0, MEMORY_USAGE_ESTIMATION_MAX);
309   }
310   gpr_atm_no_barrier_store(&resource_quota->memory_usage_estimation,
311                            memory_usage_estimation);
312 }
313
314 /* returns true if all allocations are completed */
315 static bool rq_alloc(grpc_resource_quota* resource_quota) {
316   grpc_resource_user* resource_user;
317   while ((resource_user = rulist_pop_head(resource_quota,
318                                           GRPC_RULIST_AWAITING_ALLOCATION))) {
319     gpr_mu_lock(&resource_user->mu);
320     if (GRPC_TRACE_FLAG_ENABLED(grpc_resource_quota_trace)) {
321       gpr_log(GPR_INFO,
322               "RQ: check allocation for user %p shutdown=%" PRIdPTR
323               " free_pool=%" PRId64,
324               resource_user, gpr_atm_no_barrier_load(&resource_user->shutdown),
325               resource_user->free_pool);
326     }
327     if (gpr_atm_no_barrier_load(&resource_user->shutdown)) {
328       resource_user->allocating = false;
329       grpc_closure_list_fail_all(
330           &resource_user->on_allocated,
331           GRPC_ERROR_CREATE_FROM_STATIC_STRING("Resource user shutdown"));
332       int64_t aborted_allocations = resource_user->outstanding_allocations;
333       resource_user->outstanding_allocations = 0;
334       resource_user->free_pool += aborted_allocations;
335       GRPC_CLOSURE_LIST_SCHED(&resource_user->on_allocated);
336       gpr_mu_unlock(&resource_user->mu);
337       ru_unref_by(resource_user, static_cast<gpr_atm>(aborted_allocations));
338       continue;
339     }
340     if (resource_user->free_pool < 0 &&
341         -resource_user->free_pool <= resource_quota->free_pool) {
342       int64_t amt = -resource_user->free_pool;
343       resource_user->free_pool = 0;
344       resource_quota->free_pool -= amt;
345       rq_update_estimate(resource_quota);
346       if (GRPC_TRACE_FLAG_ENABLED(grpc_resource_quota_trace)) {
347         gpr_log(GPR_INFO,
348                 "RQ %s %s: grant alloc %" PRId64
349                 " bytes; rq_free_pool -> %" PRId64,
350                 resource_quota->name, resource_user->name, amt,
351                 resource_quota->free_pool);
352       }
353     } else if (GRPC_TRACE_FLAG_ENABLED(grpc_resource_quota_trace) &&
354                resource_user->free_pool >= 0) {
355       gpr_log(GPR_INFO, "RQ %s %s: discard already satisfied alloc request",
356               resource_quota->name, resource_user->name);
357     }
358     if (resource_user->free_pool >= 0) {
359       resource_user->allocating = false;
360       resource_user->outstanding_allocations = 0;
361       GRPC_CLOSURE_LIST_SCHED(&resource_user->on_allocated);
362       gpr_mu_unlock(&resource_user->mu);
363     } else {
364       rulist_add_head(resource_user, GRPC_RULIST_AWAITING_ALLOCATION);
365       gpr_mu_unlock(&resource_user->mu);
366       return false;
367     }
368   }
369   return true;
370 }
371
372 /* returns true if any memory could be reclaimed from buffers */
373 static bool rq_reclaim_from_per_user_free_pool(
374     grpc_resource_quota* resource_quota) {
375   grpc_resource_user* resource_user;
376   while ((resource_user = rulist_pop_head(resource_quota,
377                                           GRPC_RULIST_NON_EMPTY_FREE_POOL))) {
378     gpr_mu_lock(&resource_user->mu);
379     resource_user->added_to_free_pool = false;
380     if (resource_user->free_pool > 0) {
381       int64_t amt = resource_user->free_pool;
382       resource_user->free_pool = 0;
383       resource_quota->free_pool += amt;
384       rq_update_estimate(resource_quota);
385       if (GRPC_TRACE_FLAG_ENABLED(grpc_resource_quota_trace)) {
386         gpr_log(GPR_INFO,
387                 "RQ %s %s: reclaim_from_per_user_free_pool %" PRId64
388                 " bytes; rq_free_pool -> %" PRId64,
389                 resource_quota->name, resource_user->name, amt,
390                 resource_quota->free_pool);
391       }
392       gpr_mu_unlock(&resource_user->mu);
393       return true;
394     } else {
395       if (GRPC_TRACE_FLAG_ENABLED(grpc_resource_quota_trace)) {
396         gpr_log(GPR_INFO,
397                 "RQ %s %s: failed to reclaim_from_per_user_free_pool; "
398                 "free_pool = %" PRId64 "; rq_free_pool = %" PRId64,
399                 resource_quota->name, resource_user->name,
400                 resource_user->free_pool, resource_quota->free_pool);
401       }
402       gpr_mu_unlock(&resource_user->mu);
403     }
404   }
405   return false;
406 }
407
408 /* returns true if reclamation is proceeding */
409 static bool rq_reclaim(grpc_resource_quota* resource_quota, bool destructive) {
410   if (resource_quota->reclaiming) return true;
411   grpc_rulist list = destructive ? GRPC_RULIST_RECLAIMER_DESTRUCTIVE
412                                  : GRPC_RULIST_RECLAIMER_BENIGN;
413   grpc_resource_user* resource_user = rulist_pop_head(resource_quota, list);
414   if (resource_user == nullptr) return false;
415   if (GRPC_TRACE_FLAG_ENABLED(grpc_resource_quota_trace)) {
416     gpr_log(GPR_INFO, "RQ %s %s: initiate %s reclamation", resource_quota->name,
417             resource_user->name, destructive ? "destructive" : "benign");
418   }
419   resource_quota->reclaiming = true;
420   grpc_resource_quota_ref_internal(resource_quota);
421   grpc_closure* c = resource_user->reclaimers[destructive];
422   GPR_ASSERT(c);
423   resource_quota->debug_only_last_reclaimer_resource_user = resource_user;
424   resource_quota->debug_only_last_initiated_reclaimer = c;
425   resource_user->reclaimers[destructive] = nullptr;
426   GRPC_CLOSURE_SCHED(c, GRPC_ERROR_NONE);
427   return true;
428 }
429
430 /*******************************************************************************
431  * ru_slice: a slice implementation that is backed by a grpc_resource_user
432  */
433
434 namespace grpc_core {
435
436 class RuSliceRefcount {
437  public:
438   static void Destroy(void* p) {
439     auto* rc = static_cast<RuSliceRefcount*>(p);
440     rc->~RuSliceRefcount();
441     gpr_free(rc);
442   }
443   RuSliceRefcount(grpc_resource_user* resource_user, size_t size)
444       : base_(grpc_slice_refcount::Type::REGULAR, &refs_, Destroy, this,
445               &base_),
446         resource_user_(resource_user),
447         size_(size) {
448     // Nothing to do here.
449   }
450   ~RuSliceRefcount() { grpc_resource_user_free(resource_user_, size_); }
451
452   grpc_slice_refcount* base_refcount() { return &base_; }
453
454  private:
455   grpc_slice_refcount base_;
456   RefCount refs_;
457   grpc_resource_user* resource_user_;
458   size_t size_;
459 };
460
461 }  // namespace grpc_core
462
463 static grpc_slice ru_slice_create(grpc_resource_user* resource_user,
464                                   size_t size) {
465   auto* rc = static_cast<grpc_core::RuSliceRefcount*>(
466       gpr_malloc(sizeof(grpc_core::RuSliceRefcount) + size));
467   new (rc) grpc_core::RuSliceRefcount(resource_user, size);
468   grpc_slice slice;
469
470   slice.refcount = rc->base_refcount();
471   slice.data.refcounted.bytes = reinterpret_cast<uint8_t*>(rc + 1);
472   slice.data.refcounted.length = size;
473   return slice;
474 }
475
476 /*******************************************************************************
477  * grpc_resource_quota internal implementation: resource user manipulation under
478  * the combiner
479  */
480
481 static void ru_allocate(void* ru, grpc_error* error) {
482   grpc_resource_user* resource_user = static_cast<grpc_resource_user*>(ru);
483   if (rulist_empty(resource_user->resource_quota,
484                    GRPC_RULIST_AWAITING_ALLOCATION)) {
485     rq_step_sched(resource_user->resource_quota);
486   }
487   rulist_add_tail(resource_user, GRPC_RULIST_AWAITING_ALLOCATION);
488 }
489
490 static void ru_add_to_free_pool(void* ru, grpc_error* error) {
491   grpc_resource_user* resource_user = static_cast<grpc_resource_user*>(ru);
492   if (!rulist_empty(resource_user->resource_quota,
493                     GRPC_RULIST_AWAITING_ALLOCATION) &&
494       rulist_empty(resource_user->resource_quota,
495                    GRPC_RULIST_NON_EMPTY_FREE_POOL)) {
496     rq_step_sched(resource_user->resource_quota);
497   }
498   rulist_add_tail(resource_user, GRPC_RULIST_NON_EMPTY_FREE_POOL);
499 }
500
501 static bool ru_post_reclaimer(grpc_resource_user* resource_user,
502                               bool destructive) {
503   grpc_closure* closure = resource_user->new_reclaimers[destructive];
504   GPR_ASSERT(closure != nullptr);
505   resource_user->new_reclaimers[destructive] = nullptr;
506   GPR_ASSERT(resource_user->reclaimers[destructive] == nullptr);
507   if (gpr_atm_acq_load(&resource_user->shutdown) > 0) {
508     GRPC_CLOSURE_SCHED(closure, GRPC_ERROR_CANCELLED);
509     return false;
510   }
511   resource_user->reclaimers[destructive] = closure;
512   return true;
513 }
514
515 static void ru_post_benign_reclaimer(void* ru, grpc_error* error) {
516   grpc_resource_user* resource_user = static_cast<grpc_resource_user*>(ru);
517   if (!ru_post_reclaimer(resource_user, false)) return;
518   if (!rulist_empty(resource_user->resource_quota,
519                     GRPC_RULIST_AWAITING_ALLOCATION) &&
520       rulist_empty(resource_user->resource_quota,
521                    GRPC_RULIST_NON_EMPTY_FREE_POOL) &&
522       rulist_empty(resource_user->resource_quota,
523                    GRPC_RULIST_RECLAIMER_BENIGN)) {
524     rq_step_sched(resource_user->resource_quota);
525   }
526   rulist_add_tail(resource_user, GRPC_RULIST_RECLAIMER_BENIGN);
527 }
528
529 static void ru_post_destructive_reclaimer(void* ru, grpc_error* error) {
530   grpc_resource_user* resource_user = static_cast<grpc_resource_user*>(ru);
531   if (!ru_post_reclaimer(resource_user, true)) return;
532   if (!rulist_empty(resource_user->resource_quota,
533                     GRPC_RULIST_AWAITING_ALLOCATION) &&
534       rulist_empty(resource_user->resource_quota,
535                    GRPC_RULIST_NON_EMPTY_FREE_POOL) &&
536       rulist_empty(resource_user->resource_quota,
537                    GRPC_RULIST_RECLAIMER_BENIGN) &&
538       rulist_empty(resource_user->resource_quota,
539                    GRPC_RULIST_RECLAIMER_DESTRUCTIVE)) {
540     rq_step_sched(resource_user->resource_quota);
541   }
542   rulist_add_tail(resource_user, GRPC_RULIST_RECLAIMER_DESTRUCTIVE);
543 }
544
545 static void ru_shutdown(void* ru, grpc_error* error) {
546   if (GRPC_TRACE_FLAG_ENABLED(grpc_resource_quota_trace)) {
547     gpr_log(GPR_INFO, "RU shutdown %p", ru);
548   }
549   grpc_resource_user* resource_user = static_cast<grpc_resource_user*>(ru);
550   gpr_mu_lock(&resource_user->mu);
551   GRPC_CLOSURE_SCHED(resource_user->reclaimers[0], GRPC_ERROR_CANCELLED);
552   GRPC_CLOSURE_SCHED(resource_user->reclaimers[1], GRPC_ERROR_CANCELLED);
553   resource_user->reclaimers[0] = nullptr;
554   resource_user->reclaimers[1] = nullptr;
555   rulist_remove(resource_user, GRPC_RULIST_RECLAIMER_BENIGN);
556   rulist_remove(resource_user, GRPC_RULIST_RECLAIMER_DESTRUCTIVE);
557   if (resource_user->allocating) {
558     rq_step_sched(resource_user->resource_quota);
559   }
560   gpr_mu_unlock(&resource_user->mu);
561 }
562
563 static void ru_destroy(void* ru, grpc_error* error) {
564   grpc_resource_user* resource_user = static_cast<grpc_resource_user*>(ru);
565   GPR_ASSERT(gpr_atm_no_barrier_load(&resource_user->refs) == 0);
566   // Free all the remaining thread quota
567   grpc_resource_user_free_threads(resource_user,
568                                   static_cast<int>(gpr_atm_no_barrier_load(
569                                       &resource_user->num_threads_allocated)));
570
571   for (int i = 0; i < GRPC_RULIST_COUNT; i++) {
572     rulist_remove(resource_user, static_cast<grpc_rulist>(i));
573   }
574   GRPC_CLOSURE_SCHED(resource_user->reclaimers[0], GRPC_ERROR_CANCELLED);
575   GRPC_CLOSURE_SCHED(resource_user->reclaimers[1], GRPC_ERROR_CANCELLED);
576   if (resource_user->free_pool != 0) {
577     resource_user->resource_quota->free_pool += resource_user->free_pool;
578     rq_step_sched(resource_user->resource_quota);
579   }
580   grpc_resource_quota_unref_internal(resource_user->resource_quota);
581   gpr_mu_destroy(&resource_user->mu);
582   gpr_free(resource_user->name);
583   gpr_free(resource_user);
584 }
585
586 static void ru_allocated_slices(void* arg, grpc_error* error) {
587   grpc_resource_user_slice_allocator* slice_allocator =
588       static_cast<grpc_resource_user_slice_allocator*>(arg);
589   if (error == GRPC_ERROR_NONE) {
590     for (size_t i = 0; i < slice_allocator->count; i++) {
591       grpc_slice_buffer_add_indexed(
592           slice_allocator->dest, ru_slice_create(slice_allocator->resource_user,
593                                                  slice_allocator->length));
594     }
595   }
596   GRPC_CLOSURE_RUN(&slice_allocator->on_done, GRPC_ERROR_REF(error));
597 }
598
599 /*******************************************************************************
600  * grpc_resource_quota internal implementation: quota manipulation under the
601  * combiner
602  */
603
604 typedef struct {
605   int64_t size;
606   grpc_resource_quota* resource_quota;
607   grpc_closure closure;
608 } rq_resize_args;
609
610 static void rq_resize(void* args, grpc_error* error) {
611   rq_resize_args* a = static_cast<rq_resize_args*>(args);
612   int64_t delta = a->size - a->resource_quota->size;
613   a->resource_quota->size += delta;
614   a->resource_quota->free_pool += delta;
615   rq_update_estimate(a->resource_quota);
616   rq_step_sched(a->resource_quota);
617   grpc_resource_quota_unref_internal(a->resource_quota);
618   gpr_free(a);
619 }
620
621 static void rq_reclamation_done(void* rq, grpc_error* error) {
622   grpc_resource_quota* resource_quota = static_cast<grpc_resource_quota*>(rq);
623   resource_quota->reclaiming = false;
624   rq_step_sched(resource_quota);
625   grpc_resource_quota_unref_internal(resource_quota);
626 }
627
628 /*******************************************************************************
629  * grpc_resource_quota api
630  */
631
632 /* Public API */
633 grpc_resource_quota* grpc_resource_quota_create(const char* name) {
634   grpc_resource_quota* resource_quota =
635       static_cast<grpc_resource_quota*>(gpr_malloc(sizeof(*resource_quota)));
636   gpr_ref_init(&resource_quota->refs, 1);
637   resource_quota->combiner = grpc_combiner_create();
638   resource_quota->free_pool = INT64_MAX;
639   resource_quota->size = INT64_MAX;
640   resource_quota->used = 0;
641   gpr_atm_no_barrier_store(&resource_quota->last_size, GPR_ATM_MAX);
642   gpr_mu_init(&resource_quota->thread_count_mu);
643   resource_quota->max_threads = INT_MAX;
644   resource_quota->num_threads_allocated = 0;
645   resource_quota->step_scheduled = false;
646   resource_quota->reclaiming = false;
647   gpr_atm_no_barrier_store(&resource_quota->memory_usage_estimation, 0);
648   if (name != nullptr) {
649     resource_quota->name = gpr_strdup(name);
650   } else {
651     gpr_asprintf(&resource_quota->name, "anonymous_pool_%" PRIxPTR,
652                  (intptr_t)resource_quota);
653   }
654   GRPC_CLOSURE_INIT(&resource_quota->rq_step_closure, rq_step, resource_quota,
655                     grpc_combiner_finally_scheduler(resource_quota->combiner));
656   GRPC_CLOSURE_INIT(&resource_quota->rq_reclamation_done_closure,
657                     rq_reclamation_done, resource_quota,
658                     grpc_combiner_scheduler(resource_quota->combiner));
659   for (int i = 0; i < GRPC_RULIST_COUNT; i++) {
660     resource_quota->roots[i] = nullptr;
661   }
662   return resource_quota;
663 }
664
665 void grpc_resource_quota_unref_internal(grpc_resource_quota* resource_quota) {
666   if (gpr_unref(&resource_quota->refs)) {
667     // No outstanding thread quota
668     GPR_ASSERT(resource_quota->num_threads_allocated == 0);
669     GRPC_COMBINER_UNREF(resource_quota->combiner, "resource_quota");
670     gpr_free(resource_quota->name);
671     gpr_mu_destroy(&resource_quota->thread_count_mu);
672     gpr_free(resource_quota);
673   }
674 }
675
676 /* Public API */
677 void grpc_resource_quota_unref(grpc_resource_quota* resource_quota) {
678   grpc_core::ExecCtx exec_ctx;
679   grpc_resource_quota_unref_internal(resource_quota);
680 }
681
682 grpc_resource_quota* grpc_resource_quota_ref_internal(
683     grpc_resource_quota* resource_quota) {
684   gpr_ref(&resource_quota->refs);
685   return resource_quota;
686 }
687
688 /* Public API */
689 void grpc_resource_quota_ref(grpc_resource_quota* resource_quota) {
690   grpc_resource_quota_ref_internal(resource_quota);
691 }
692
693 double grpc_resource_quota_get_memory_pressure(
694     grpc_resource_quota* resource_quota) {
695   return (static_cast<double>(gpr_atm_no_barrier_load(
696              &resource_quota->memory_usage_estimation))) /
697          (static_cast<double>(MEMORY_USAGE_ESTIMATION_MAX));
698 }
699
700 /* Public API */
701 void grpc_resource_quota_set_max_threads(grpc_resource_quota* resource_quota,
702                                          int new_max_threads) {
703   GPR_ASSERT(new_max_threads >= 0);
704   gpr_mu_lock(&resource_quota->thread_count_mu);
705   resource_quota->max_threads = new_max_threads;
706   gpr_mu_unlock(&resource_quota->thread_count_mu);
707 }
708
709 /* Public API */
710 void grpc_resource_quota_resize(grpc_resource_quota* resource_quota,
711                                 size_t size) {
712   grpc_core::ExecCtx exec_ctx;
713   rq_resize_args* a = static_cast<rq_resize_args*>(gpr_malloc(sizeof(*a)));
714   a->resource_quota = grpc_resource_quota_ref_internal(resource_quota);
715   a->size = static_cast<int64_t>(size);
716   gpr_atm_no_barrier_store(&resource_quota->last_size,
717                            (gpr_atm)GPR_MIN((size_t)GPR_ATM_MAX, size));
718   GRPC_CLOSURE_INIT(&a->closure, rq_resize, a, grpc_schedule_on_exec_ctx);
719   GRPC_CLOSURE_SCHED(&a->closure, GRPC_ERROR_NONE);
720 }
721
722 size_t grpc_resource_quota_peek_size(grpc_resource_quota* resource_quota) {
723   return static_cast<size_t>(
724       gpr_atm_no_barrier_load(&resource_quota->last_size));
725 }
726
727 /*******************************************************************************
728  * grpc_resource_user channel args api
729  */
730
731 grpc_resource_quota* grpc_resource_quota_from_channel_args(
732     const grpc_channel_args* channel_args, bool create) {
733   for (size_t i = 0; i < channel_args->num_args; i++) {
734     if (0 == strcmp(channel_args->args[i].key, GRPC_ARG_RESOURCE_QUOTA)) {
735       if (channel_args->args[i].type == GRPC_ARG_POINTER) {
736         return grpc_resource_quota_ref_internal(
737             static_cast<grpc_resource_quota*>(
738                 channel_args->args[i].value.pointer.p));
739       } else {
740         gpr_log(GPR_DEBUG, GRPC_ARG_RESOURCE_QUOTA " should be a pointer");
741       }
742     }
743   }
744   return create ? grpc_resource_quota_create(nullptr) : nullptr;
745 }
746
747 static void* rq_copy(void* rq) {
748   grpc_resource_quota_ref(static_cast<grpc_resource_quota*>(rq));
749   return rq;
750 }
751
752 static void rq_destroy(void* rq) {
753   grpc_resource_quota_unref_internal(static_cast<grpc_resource_quota*>(rq));
754 }
755
756 static int rq_cmp(void* a, void* b) { return GPR_ICMP(a, b); }
757
758 const grpc_arg_pointer_vtable* grpc_resource_quota_arg_vtable(void) {
759   static const grpc_arg_pointer_vtable vtable = {rq_copy, rq_destroy, rq_cmp};
760   return &vtable;
761 }
762
763 /*******************************************************************************
764  * grpc_resource_user api
765  */
766
767 grpc_resource_user* grpc_resource_user_create(
768     grpc_resource_quota* resource_quota, const char* name) {
769   grpc_resource_user* resource_user =
770       static_cast<grpc_resource_user*>(gpr_malloc(sizeof(*resource_user)));
771   resource_user->resource_quota =
772       grpc_resource_quota_ref_internal(resource_quota);
773   GRPC_CLOSURE_INIT(&resource_user->allocate_closure, &ru_allocate,
774                     resource_user,
775                     grpc_combiner_scheduler(resource_quota->combiner));
776   GRPC_CLOSURE_INIT(&resource_user->add_to_free_pool_closure,
777                     &ru_add_to_free_pool, resource_user,
778                     grpc_combiner_scheduler(resource_quota->combiner));
779   GRPC_CLOSURE_INIT(&resource_user->post_reclaimer_closure[0],
780                     &ru_post_benign_reclaimer, resource_user,
781                     grpc_combiner_scheduler(resource_quota->combiner));
782   GRPC_CLOSURE_INIT(&resource_user->post_reclaimer_closure[1],
783                     &ru_post_destructive_reclaimer, resource_user,
784                     grpc_combiner_scheduler(resource_quota->combiner));
785   GRPC_CLOSURE_INIT(&resource_user->destroy_closure, &ru_destroy, resource_user,
786                     grpc_combiner_scheduler(resource_quota->combiner));
787   gpr_mu_init(&resource_user->mu);
788   gpr_atm_rel_store(&resource_user->refs, 1);
789   gpr_atm_rel_store(&resource_user->shutdown, 0);
790   resource_user->free_pool = 0;
791   grpc_closure_list_init(&resource_user->on_allocated);
792   resource_user->allocating = false;
793   resource_user->added_to_free_pool = false;
794   gpr_atm_no_barrier_store(&resource_user->num_threads_allocated, 0);
795   resource_user->reclaimers[0] = nullptr;
796   resource_user->reclaimers[1] = nullptr;
797   resource_user->new_reclaimers[0] = nullptr;
798   resource_user->new_reclaimers[1] = nullptr;
799   resource_user->outstanding_allocations = 0;
800   for (int i = 0; i < GRPC_RULIST_COUNT; i++) {
801     resource_user->links[i].next = resource_user->links[i].prev = nullptr;
802   }
803   if (name != nullptr) {
804     resource_user->name = gpr_strdup(name);
805   } else {
806     gpr_asprintf(&resource_user->name, "anonymous_resource_user_%" PRIxPTR,
807                  (intptr_t)resource_user);
808   }
809   return resource_user;
810 }
811
812 grpc_resource_quota* grpc_resource_user_quota(
813     grpc_resource_user* resource_user) {
814   return resource_user->resource_quota;
815 }
816
817 static void ru_ref_by(grpc_resource_user* resource_user, gpr_atm amount) {
818   GPR_ASSERT(amount > 0);
819   GPR_ASSERT(gpr_atm_no_barrier_fetch_add(&resource_user->refs, amount) != 0);
820 }
821
822 static void ru_unref_by(grpc_resource_user* resource_user, gpr_atm amount) {
823   GPR_ASSERT(amount > 0);
824   gpr_atm old = gpr_atm_full_fetch_add(&resource_user->refs, -amount);
825   GPR_ASSERT(old >= amount);
826   if (old == amount) {
827     GRPC_CLOSURE_SCHED(&resource_user->destroy_closure, GRPC_ERROR_NONE);
828   }
829 }
830
831 void grpc_resource_user_ref(grpc_resource_user* resource_user) {
832   ru_ref_by(resource_user, 1);
833 }
834
835 void grpc_resource_user_unref(grpc_resource_user* resource_user) {
836   ru_unref_by(resource_user, 1);
837 }
838
839 void grpc_resource_user_shutdown(grpc_resource_user* resource_user) {
840   if (gpr_atm_full_fetch_add(&resource_user->shutdown, 1) == 0) {
841     GRPC_CLOSURE_SCHED(
842         GRPC_CLOSURE_CREATE(
843             ru_shutdown, resource_user,
844             grpc_combiner_scheduler(resource_user->resource_quota->combiner)),
845         GRPC_ERROR_NONE);
846   }
847 }
848
849 bool grpc_resource_user_allocate_threads(grpc_resource_user* resource_user,
850                                          int thread_count) {
851   GPR_ASSERT(thread_count >= 0);
852   bool is_success = false;
853   gpr_mu_lock(&resource_user->resource_quota->thread_count_mu);
854   grpc_resource_quota* rq = resource_user->resource_quota;
855   if (rq->num_threads_allocated + thread_count <= rq->max_threads) {
856     rq->num_threads_allocated += thread_count;
857     gpr_atm_no_barrier_fetch_add(&resource_user->num_threads_allocated,
858                                  thread_count);
859     is_success = true;
860   }
861   gpr_mu_unlock(&resource_user->resource_quota->thread_count_mu);
862   return is_success;
863 }
864
865 void grpc_resource_user_free_threads(grpc_resource_user* resource_user,
866                                      int thread_count) {
867   GPR_ASSERT(thread_count >= 0);
868   gpr_mu_lock(&resource_user->resource_quota->thread_count_mu);
869   grpc_resource_quota* rq = resource_user->resource_quota;
870   rq->num_threads_allocated -= thread_count;
871   int old_count = static_cast<int>(gpr_atm_no_barrier_fetch_add(
872       &resource_user->num_threads_allocated, -thread_count));
873   if (old_count < thread_count || rq->num_threads_allocated < 0) {
874     gpr_log(GPR_ERROR,
875             "Releasing more threads (%d) than currently allocated (rq threads: "
876             "%d, ru threads: %d)",
877             thread_count, rq->num_threads_allocated + thread_count, old_count);
878     abort();
879   }
880   gpr_mu_unlock(&resource_user->resource_quota->thread_count_mu);
881 }
882
883 static void resource_user_alloc_locked(grpc_resource_user* resource_user,
884                                        size_t size,
885                                        grpc_closure* optional_on_done) {
886   ru_ref_by(resource_user, static_cast<gpr_atm>(size));
887   resource_user->free_pool -= static_cast<int64_t>(size);
888   if (GRPC_TRACE_FLAG_ENABLED(grpc_resource_quota_trace)) {
889     gpr_log(GPR_INFO, "RQ %s %s: alloc %" PRIdPTR "; free_pool -> %" PRId64,
890             resource_user->resource_quota->name, resource_user->name, size,
891             resource_user->free_pool);
892   }
893   if (resource_user->free_pool < 0) {
894     if (optional_on_done != nullptr) {
895       resource_user->outstanding_allocations += static_cast<int64_t>(size);
896       grpc_closure_list_append(&resource_user->on_allocated, optional_on_done,
897                                GRPC_ERROR_NONE);
898     }
899     if (!resource_user->allocating) {
900       resource_user->allocating = true;
901       GRPC_CLOSURE_SCHED(&resource_user->allocate_closure, GRPC_ERROR_NONE);
902     }
903   } else {
904     GRPC_CLOSURE_SCHED(optional_on_done, GRPC_ERROR_NONE);
905   }
906 }
907
908 bool grpc_resource_user_safe_alloc(grpc_resource_user* resource_user,
909                                    size_t size) {
910   if (gpr_atm_no_barrier_load(&resource_user->shutdown)) return false;
911   gpr_mu_lock(&resource_user->mu);
912   grpc_resource_quota* resource_quota = resource_user->resource_quota;
913   bool cas_success;
914   do {
915     gpr_atm used = gpr_atm_no_barrier_load(&resource_quota->used);
916     gpr_atm new_used = used + size;
917     if (static_cast<size_t>(new_used) >
918         grpc_resource_quota_peek_size(resource_quota)) {
919       gpr_mu_unlock(&resource_user->mu);
920       return false;
921     }
922     cas_success = gpr_atm_full_cas(&resource_quota->used, used, new_used);
923   } while (!cas_success);
924   resource_user_alloc_locked(resource_user, size, nullptr);
925   gpr_mu_unlock(&resource_user->mu);
926   return true;
927 }
928
929 void grpc_resource_user_alloc(grpc_resource_user* resource_user, size_t size,
930                               grpc_closure* optional_on_done) {
931   // TODO(juanlishen): Maybe return immediately if shutting down. Deferring this
932   // because some tests become flaky after the change.
933   gpr_mu_lock(&resource_user->mu);
934   grpc_resource_quota* resource_quota = resource_user->resource_quota;
935   gpr_atm_no_barrier_fetch_add(&resource_quota->used, size);
936   resource_user_alloc_locked(resource_user, size, optional_on_done);
937   gpr_mu_unlock(&resource_user->mu);
938 }
939
940 void grpc_resource_user_free(grpc_resource_user* resource_user, size_t size) {
941   gpr_mu_lock(&resource_user->mu);
942   grpc_resource_quota* resource_quota = resource_user->resource_quota;
943   gpr_atm prior = gpr_atm_no_barrier_fetch_add(&resource_quota->used, -size);
944   GPR_ASSERT(prior >= static_cast<long>(size));
945   bool was_zero_or_negative = resource_user->free_pool <= 0;
946   resource_user->free_pool += static_cast<int64_t>(size);
947   if (GRPC_TRACE_FLAG_ENABLED(grpc_resource_quota_trace)) {
948     gpr_log(GPR_INFO, "RQ %s %s: free %" PRIdPTR "; free_pool -> %" PRId64,
949             resource_user->resource_quota->name, resource_user->name, size,
950             resource_user->free_pool);
951   }
952   bool is_bigger_than_zero = resource_user->free_pool > 0;
953   if (is_bigger_than_zero && was_zero_or_negative &&
954       !resource_user->added_to_free_pool) {
955     resource_user->added_to_free_pool = true;
956     GRPC_CLOSURE_SCHED(&resource_user->add_to_free_pool_closure,
957                        GRPC_ERROR_NONE);
958   }
959   gpr_mu_unlock(&resource_user->mu);
960   ru_unref_by(resource_user, static_cast<gpr_atm>(size));
961 }
962
963 void grpc_resource_user_post_reclaimer(grpc_resource_user* resource_user,
964                                        bool destructive,
965                                        grpc_closure* closure) {
966   GPR_ASSERT(resource_user->new_reclaimers[destructive] == nullptr);
967   resource_user->new_reclaimers[destructive] = closure;
968   GRPC_CLOSURE_SCHED(&resource_user->post_reclaimer_closure[destructive],
969                      GRPC_ERROR_NONE);
970 }
971
972 void grpc_resource_user_finish_reclamation(grpc_resource_user* resource_user) {
973   if (GRPC_TRACE_FLAG_ENABLED(grpc_resource_quota_trace)) {
974     gpr_log(GPR_INFO, "RQ %s %s: reclamation complete",
975             resource_user->resource_quota->name, resource_user->name);
976   }
977   GRPC_CLOSURE_SCHED(
978       &resource_user->resource_quota->rq_reclamation_done_closure,
979       GRPC_ERROR_NONE);
980 }
981
982 void grpc_resource_user_slice_allocator_init(
983     grpc_resource_user_slice_allocator* slice_allocator,
984     grpc_resource_user* resource_user, grpc_iomgr_cb_func cb, void* p) {
985   GRPC_CLOSURE_INIT(&slice_allocator->on_allocated, ru_allocated_slices,
986                     slice_allocator, grpc_schedule_on_exec_ctx);
987   GRPC_CLOSURE_INIT(&slice_allocator->on_done, cb, p,
988                     grpc_schedule_on_exec_ctx);
989   slice_allocator->resource_user = resource_user;
990 }
991
992 void grpc_resource_user_alloc_slices(
993     grpc_resource_user_slice_allocator* slice_allocator, size_t length,
994     size_t count, grpc_slice_buffer* dest) {
995   if (gpr_atm_no_barrier_load(&slice_allocator->resource_user->shutdown)) {
996     GRPC_CLOSURE_SCHED(
997         &slice_allocator->on_allocated,
998         GRPC_ERROR_CREATE_FROM_STATIC_STRING("Resource user shutdown"));
999     return;
1000   }
1001   slice_allocator->length = length;
1002   slice_allocator->count = count;
1003   slice_allocator->dest = dest;
1004   grpc_resource_user_alloc(slice_allocator->resource_user, count * length,
1005                            &slice_allocator->on_allocated);
1006 }