3 * Copyright 2016 gRPC authors.
5 * Licensed under the Apache License, Version 2.0 (the "License");
6 * you may not use this file except in compliance with the License.
7 * You may obtain a copy of the License at
9 * http://www.apache.org/licenses/LICENSE-2.0
11 * Unless required by applicable law or agreed to in writing, software
12 * distributed under the License is distributed on an "AS IS" BASIS,
13 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14 * See the License for the specific language governing permissions and
15 * limitations under the License.
19 #include <grpc/support/port_platform.h>
21 #include "src/core/lib/iomgr/resource_quota.h"
28 #include <grpc/slice_buffer.h>
29 #include <grpc/support/alloc.h>
30 #include <grpc/support/log.h>
31 #include <grpc/support/string_util.h>
33 #include "src/core/lib/gpr/useful.h"
34 #include "src/core/lib/iomgr/combiner.h"
35 #include "src/core/lib/slice/slice_internal.h"
37 grpc_core::TraceFlag grpc_resource_quota_trace(false, "resource_quota");
39 #define MEMORY_USAGE_ESTIMATION_MAX 65536
41 /* Internal linked list pointers for a resource user */
43 grpc_resource_user* next;
44 grpc_resource_user* prev;
45 } grpc_resource_user_link;
47 /* Resource users are kept in (potentially) several intrusive linked lists
48 at once. These are the list names. */
50 /* Resource users that are waiting for an allocation */
51 GRPC_RULIST_AWAITING_ALLOCATION,
52 /* Resource users that have free memory available for internal reclamation */
53 GRPC_RULIST_NON_EMPTY_FREE_POOL,
54 /* Resource users that have published a benign reclamation is available */
55 GRPC_RULIST_RECLAIMER_BENIGN,
56 /* Resource users that have published a destructive reclamation is
58 GRPC_RULIST_RECLAIMER_DESTRUCTIVE,
59 /* Number of lists: must be last */
63 struct grpc_resource_user {
64 /* The quota this resource user consumes from */
65 grpc_resource_quota* resource_quota;
67 /* Closure to schedule an allocation under the resource quota combiner lock */
68 grpc_closure allocate_closure;
69 /* Closure to publish a non empty free pool under the resource quota combiner
71 grpc_closure add_to_free_pool_closure;
73 /* one ref for each ref call (released by grpc_resource_user_unref), and one
74 ref for each byte allocated (released by grpc_resource_user_free) */
76 /* is this resource user unlocked? starts at 0, increases for each shutdown
81 /* The amount of memory (in bytes) this user has cached for its own use: to
82 avoid quota contention, each resource user can keep some memory in
83 addition to what it is immediately using (e.g., for caching), and the quota
84 can pull it back under memory pressure.
85 This value can become negative if more memory has been requested than
86 existed in the free pool, at which point the quota is consulted to bring
87 this value non-negative (asynchronously). */
89 /* A list of closures to call once free_pool becomes non-negative - ie when
90 all outstanding allocations have been granted. */
91 grpc_closure_list on_allocated;
92 /* True if we are currently trying to allocate from the quota, false if not */
94 /* The amount of memory (in bytes) that has been requested from this user
95 * asynchronously but hasn't been granted yet. */
96 int64_t outstanding_allocations;
97 /* True if we are currently trying to add ourselves to the non-free quota
98 list, false otherwise */
99 bool added_to_free_pool;
101 /* The number of threads currently allocated to this resource user */
102 gpr_atm num_threads_allocated;
104 /* Reclaimers: index 0 is the benign reclaimer, 1 is the destructive reclaimer
106 grpc_closure* reclaimers[2];
107 /* Reclaimers just posted: once we're in the combiner lock, we'll move them
108 to the array above */
109 grpc_closure* new_reclaimers[2];
110 /* Trampoline closures to finish reclamation and re-enter the quota combiner
112 grpc_closure post_reclaimer_closure[2];
114 /* Closure to execute under the quota combiner to de-register and shutdown the
116 grpc_closure destroy_closure;
118 /* Links in the various grpc_rulist lists */
119 grpc_resource_user_link links[GRPC_RULIST_COUNT];
121 /* The name of this resource user, for debugging/tracing */
125 struct grpc_resource_quota {
129 /* estimate of current memory usage
130 scaled to the range [0..RESOURCE_USAGE_ESTIMATION_MAX] */
131 gpr_atm memory_usage_estimation;
133 /* Master combiner lock: all activity on a quota executes under this combiner
134 * (so no mutex is needed for this data structure) */
135 grpc_combiner* combiner;
136 /* Size of the resource quota */
138 /* Amount of free memory in the resource quota */
140 /* Used size of memory in the resource quota. Updated as soon as the resource
141 * users start to allocate or free the memory. */
146 /* Mutex to protect max_threads and num_threads_allocated */
147 /* Note: We could have used gpr_atm for max_threads and num_threads_allocated
148 * and avoid having this mutex; but in that case, each invocation of the
149 * function grpc_resource_user_allocate_threads() would have had to do at
150 * least two atomic loads (for max_threads and num_threads_allocated) followed
151 * by a CAS (on num_threads_allocated).
152 * Moreover, we expect grpc_resource_user_allocate_threads() to be often
153 * called concurrently thereby increasing the chances of failing the CAS
154 * operation. This additional complexity is not worth the tiny perf gain we
155 * may (or may not) have by using atomics */
156 gpr_mu thread_count_mu;
158 /* Max number of threads allowed */
161 /* Number of threads currently allocated via this resource_quota object */
162 int num_threads_allocated;
164 /* Has rq_step been scheduled to occur? */
167 /* Are we currently reclaiming memory */
170 /* Closure around rq_step */
171 grpc_closure rq_step_closure;
173 /* Closure around rq_reclamation_done */
174 grpc_closure rq_reclamation_done_closure;
176 /* This is only really usable for debugging: it's always a stale pointer, but
177 a stale pointer that might just be fresh enough to guide us to where the
178 reclamation system is stuck */
179 grpc_closure* debug_only_last_initiated_reclaimer;
180 grpc_resource_user* debug_only_last_reclaimer_resource_user;
182 /* Roots of all resource user lists */
183 grpc_resource_user* roots[GRPC_RULIST_COUNT];
188 static void ru_unref_by(grpc_resource_user* resource_user, gpr_atm amount);
190 /*******************************************************************************
194 static void rulist_add_head(grpc_resource_user* resource_user,
196 grpc_resource_quota* resource_quota = resource_user->resource_quota;
197 grpc_resource_user** root = &resource_quota->roots[list];
198 if (*root == nullptr) {
199 *root = resource_user;
200 resource_user->links[list].next = resource_user->links[list].prev =
203 resource_user->links[list].next = *root;
204 resource_user->links[list].prev = (*root)->links[list].prev;
205 resource_user->links[list].next->links[list].prev =
206 resource_user->links[list].prev->links[list].next = resource_user;
207 *root = resource_user;
211 static void rulist_add_tail(grpc_resource_user* resource_user,
213 grpc_resource_quota* resource_quota = resource_user->resource_quota;
214 grpc_resource_user** root = &resource_quota->roots[list];
215 if (*root == nullptr) {
216 *root = resource_user;
217 resource_user->links[list].next = resource_user->links[list].prev =
220 resource_user->links[list].next = (*root)->links[list].next;
221 resource_user->links[list].prev = *root;
222 resource_user->links[list].next->links[list].prev =
223 resource_user->links[list].prev->links[list].next = resource_user;
227 static bool rulist_empty(grpc_resource_quota* resource_quota,
229 return resource_quota->roots[list] == nullptr;
232 static grpc_resource_user* rulist_pop_head(grpc_resource_quota* resource_quota,
234 grpc_resource_user** root = &resource_quota->roots[list];
235 grpc_resource_user* resource_user = *root;
236 if (resource_user == nullptr) {
239 if (resource_user->links[list].next == resource_user) {
242 resource_user->links[list].next->links[list].prev =
243 resource_user->links[list].prev;
244 resource_user->links[list].prev->links[list].next =
245 resource_user->links[list].next;
246 *root = resource_user->links[list].next;
248 resource_user->links[list].next = resource_user->links[list].prev = nullptr;
249 return resource_user;
252 static void rulist_remove(grpc_resource_user* resource_user, grpc_rulist list) {
253 if (resource_user->links[list].next == nullptr) return;
254 grpc_resource_quota* resource_quota = resource_user->resource_quota;
255 if (resource_quota->roots[list] == resource_user) {
256 resource_quota->roots[list] = resource_user->links[list].next;
257 if (resource_quota->roots[list] == resource_user) {
258 resource_quota->roots[list] = nullptr;
261 resource_user->links[list].next->links[list].prev =
262 resource_user->links[list].prev;
263 resource_user->links[list].prev->links[list].next =
264 resource_user->links[list].next;
265 resource_user->links[list].next = resource_user->links[list].prev = nullptr;
268 /*******************************************************************************
269 * resource quota state machine
272 static bool rq_alloc(grpc_resource_quota* resource_quota);
273 static bool rq_reclaim_from_per_user_free_pool(
274 grpc_resource_quota* resource_quota);
275 static bool rq_reclaim(grpc_resource_quota* resource_quota, bool destructive);
277 static void rq_step(void* rq, grpc_error* error) {
278 grpc_resource_quota* resource_quota = static_cast<grpc_resource_quota*>(rq);
279 resource_quota->step_scheduled = false;
281 if (rq_alloc(resource_quota)) goto done;
282 } while (rq_reclaim_from_per_user_free_pool(resource_quota));
284 if (!rq_reclaim(resource_quota, false)) {
285 rq_reclaim(resource_quota, true);
289 grpc_resource_quota_unref_internal(resource_quota);
292 static void rq_step_sched(grpc_resource_quota* resource_quota) {
293 if (resource_quota->step_scheduled) return;
294 resource_quota->step_scheduled = true;
295 grpc_resource_quota_ref_internal(resource_quota);
296 GRPC_CLOSURE_SCHED(&resource_quota->rq_step_closure, GRPC_ERROR_NONE);
299 /* update the atomically available resource estimate - use no barriers since
300 timeliness of delivery really doesn't matter much */
301 static void rq_update_estimate(grpc_resource_quota* resource_quota) {
302 gpr_atm memory_usage_estimation = MEMORY_USAGE_ESTIMATION_MAX;
303 if (resource_quota->size != 0) {
304 memory_usage_estimation =
305 GPR_CLAMP((gpr_atm)((1.0 - ((double)resource_quota->free_pool) /
306 ((double)resource_quota->size)) *
307 MEMORY_USAGE_ESTIMATION_MAX),
308 0, MEMORY_USAGE_ESTIMATION_MAX);
310 gpr_atm_no_barrier_store(&resource_quota->memory_usage_estimation,
311 memory_usage_estimation);
314 /* returns true if all allocations are completed */
315 static bool rq_alloc(grpc_resource_quota* resource_quota) {
316 grpc_resource_user* resource_user;
317 while ((resource_user = rulist_pop_head(resource_quota,
318 GRPC_RULIST_AWAITING_ALLOCATION))) {
319 gpr_mu_lock(&resource_user->mu);
320 if (GRPC_TRACE_FLAG_ENABLED(grpc_resource_quota_trace)) {
322 "RQ: check allocation for user %p shutdown=%" PRIdPTR
323 " free_pool=%" PRId64,
324 resource_user, gpr_atm_no_barrier_load(&resource_user->shutdown),
325 resource_user->free_pool);
327 if (gpr_atm_no_barrier_load(&resource_user->shutdown)) {
328 resource_user->allocating = false;
329 grpc_closure_list_fail_all(
330 &resource_user->on_allocated,
331 GRPC_ERROR_CREATE_FROM_STATIC_STRING("Resource user shutdown"));
332 int64_t aborted_allocations = resource_user->outstanding_allocations;
333 resource_user->outstanding_allocations = 0;
334 resource_user->free_pool += aborted_allocations;
335 GRPC_CLOSURE_LIST_SCHED(&resource_user->on_allocated);
336 gpr_mu_unlock(&resource_user->mu);
337 ru_unref_by(resource_user, static_cast<gpr_atm>(aborted_allocations));
340 if (resource_user->free_pool < 0 &&
341 -resource_user->free_pool <= resource_quota->free_pool) {
342 int64_t amt = -resource_user->free_pool;
343 resource_user->free_pool = 0;
344 resource_quota->free_pool -= amt;
345 rq_update_estimate(resource_quota);
346 if (GRPC_TRACE_FLAG_ENABLED(grpc_resource_quota_trace)) {
348 "RQ %s %s: grant alloc %" PRId64
349 " bytes; rq_free_pool -> %" PRId64,
350 resource_quota->name, resource_user->name, amt,
351 resource_quota->free_pool);
353 } else if (GRPC_TRACE_FLAG_ENABLED(grpc_resource_quota_trace) &&
354 resource_user->free_pool >= 0) {
355 gpr_log(GPR_INFO, "RQ %s %s: discard already satisfied alloc request",
356 resource_quota->name, resource_user->name);
358 if (resource_user->free_pool >= 0) {
359 resource_user->allocating = false;
360 resource_user->outstanding_allocations = 0;
361 GRPC_CLOSURE_LIST_SCHED(&resource_user->on_allocated);
362 gpr_mu_unlock(&resource_user->mu);
364 rulist_add_head(resource_user, GRPC_RULIST_AWAITING_ALLOCATION);
365 gpr_mu_unlock(&resource_user->mu);
372 /* returns true if any memory could be reclaimed from buffers */
373 static bool rq_reclaim_from_per_user_free_pool(
374 grpc_resource_quota* resource_quota) {
375 grpc_resource_user* resource_user;
376 while ((resource_user = rulist_pop_head(resource_quota,
377 GRPC_RULIST_NON_EMPTY_FREE_POOL))) {
378 gpr_mu_lock(&resource_user->mu);
379 resource_user->added_to_free_pool = false;
380 if (resource_user->free_pool > 0) {
381 int64_t amt = resource_user->free_pool;
382 resource_user->free_pool = 0;
383 resource_quota->free_pool += amt;
384 rq_update_estimate(resource_quota);
385 if (GRPC_TRACE_FLAG_ENABLED(grpc_resource_quota_trace)) {
387 "RQ %s %s: reclaim_from_per_user_free_pool %" PRId64
388 " bytes; rq_free_pool -> %" PRId64,
389 resource_quota->name, resource_user->name, amt,
390 resource_quota->free_pool);
392 gpr_mu_unlock(&resource_user->mu);
395 if (GRPC_TRACE_FLAG_ENABLED(grpc_resource_quota_trace)) {
397 "RQ %s %s: failed to reclaim_from_per_user_free_pool; "
398 "free_pool = %" PRId64 "; rq_free_pool = %" PRId64,
399 resource_quota->name, resource_user->name,
400 resource_user->free_pool, resource_quota->free_pool);
402 gpr_mu_unlock(&resource_user->mu);
408 /* returns true if reclamation is proceeding */
409 static bool rq_reclaim(grpc_resource_quota* resource_quota, bool destructive) {
410 if (resource_quota->reclaiming) return true;
411 grpc_rulist list = destructive ? GRPC_RULIST_RECLAIMER_DESTRUCTIVE
412 : GRPC_RULIST_RECLAIMER_BENIGN;
413 grpc_resource_user* resource_user = rulist_pop_head(resource_quota, list);
414 if (resource_user == nullptr) return false;
415 if (GRPC_TRACE_FLAG_ENABLED(grpc_resource_quota_trace)) {
416 gpr_log(GPR_INFO, "RQ %s %s: initiate %s reclamation", resource_quota->name,
417 resource_user->name, destructive ? "destructive" : "benign");
419 resource_quota->reclaiming = true;
420 grpc_resource_quota_ref_internal(resource_quota);
421 grpc_closure* c = resource_user->reclaimers[destructive];
423 resource_quota->debug_only_last_reclaimer_resource_user = resource_user;
424 resource_quota->debug_only_last_initiated_reclaimer = c;
425 resource_user->reclaimers[destructive] = nullptr;
426 GRPC_CLOSURE_SCHED(c, GRPC_ERROR_NONE);
430 /*******************************************************************************
431 * ru_slice: a slice implementation that is backed by a grpc_resource_user
434 namespace grpc_core {
436 class RuSliceRefcount {
438 static void Destroy(void* p) {
439 auto* rc = static_cast<RuSliceRefcount*>(p);
440 rc->~RuSliceRefcount();
443 RuSliceRefcount(grpc_resource_user* resource_user, size_t size)
444 : base_(grpc_slice_refcount::Type::REGULAR, &refs_, Destroy, this,
446 resource_user_(resource_user),
448 // Nothing to do here.
450 ~RuSliceRefcount() { grpc_resource_user_free(resource_user_, size_); }
452 grpc_slice_refcount* base_refcount() { return &base_; }
455 grpc_slice_refcount base_;
457 grpc_resource_user* resource_user_;
461 } // namespace grpc_core
463 static grpc_slice ru_slice_create(grpc_resource_user* resource_user,
465 auto* rc = static_cast<grpc_core::RuSliceRefcount*>(
466 gpr_malloc(sizeof(grpc_core::RuSliceRefcount) + size));
467 new (rc) grpc_core::RuSliceRefcount(resource_user, size);
470 slice.refcount = rc->base_refcount();
471 slice.data.refcounted.bytes = reinterpret_cast<uint8_t*>(rc + 1);
472 slice.data.refcounted.length = size;
476 /*******************************************************************************
477 * grpc_resource_quota internal implementation: resource user manipulation under
481 static void ru_allocate(void* ru, grpc_error* error) {
482 grpc_resource_user* resource_user = static_cast<grpc_resource_user*>(ru);
483 if (rulist_empty(resource_user->resource_quota,
484 GRPC_RULIST_AWAITING_ALLOCATION)) {
485 rq_step_sched(resource_user->resource_quota);
487 rulist_add_tail(resource_user, GRPC_RULIST_AWAITING_ALLOCATION);
490 static void ru_add_to_free_pool(void* ru, grpc_error* error) {
491 grpc_resource_user* resource_user = static_cast<grpc_resource_user*>(ru);
492 if (!rulist_empty(resource_user->resource_quota,
493 GRPC_RULIST_AWAITING_ALLOCATION) &&
494 rulist_empty(resource_user->resource_quota,
495 GRPC_RULIST_NON_EMPTY_FREE_POOL)) {
496 rq_step_sched(resource_user->resource_quota);
498 rulist_add_tail(resource_user, GRPC_RULIST_NON_EMPTY_FREE_POOL);
501 static bool ru_post_reclaimer(grpc_resource_user* resource_user,
503 grpc_closure* closure = resource_user->new_reclaimers[destructive];
504 GPR_ASSERT(closure != nullptr);
505 resource_user->new_reclaimers[destructive] = nullptr;
506 GPR_ASSERT(resource_user->reclaimers[destructive] == nullptr);
507 if (gpr_atm_acq_load(&resource_user->shutdown) > 0) {
508 GRPC_CLOSURE_SCHED(closure, GRPC_ERROR_CANCELLED);
511 resource_user->reclaimers[destructive] = closure;
515 static void ru_post_benign_reclaimer(void* ru, grpc_error* error) {
516 grpc_resource_user* resource_user = static_cast<grpc_resource_user*>(ru);
517 if (!ru_post_reclaimer(resource_user, false)) return;
518 if (!rulist_empty(resource_user->resource_quota,
519 GRPC_RULIST_AWAITING_ALLOCATION) &&
520 rulist_empty(resource_user->resource_quota,
521 GRPC_RULIST_NON_EMPTY_FREE_POOL) &&
522 rulist_empty(resource_user->resource_quota,
523 GRPC_RULIST_RECLAIMER_BENIGN)) {
524 rq_step_sched(resource_user->resource_quota);
526 rulist_add_tail(resource_user, GRPC_RULIST_RECLAIMER_BENIGN);
529 static void ru_post_destructive_reclaimer(void* ru, grpc_error* error) {
530 grpc_resource_user* resource_user = static_cast<grpc_resource_user*>(ru);
531 if (!ru_post_reclaimer(resource_user, true)) return;
532 if (!rulist_empty(resource_user->resource_quota,
533 GRPC_RULIST_AWAITING_ALLOCATION) &&
534 rulist_empty(resource_user->resource_quota,
535 GRPC_RULIST_NON_EMPTY_FREE_POOL) &&
536 rulist_empty(resource_user->resource_quota,
537 GRPC_RULIST_RECLAIMER_BENIGN) &&
538 rulist_empty(resource_user->resource_quota,
539 GRPC_RULIST_RECLAIMER_DESTRUCTIVE)) {
540 rq_step_sched(resource_user->resource_quota);
542 rulist_add_tail(resource_user, GRPC_RULIST_RECLAIMER_DESTRUCTIVE);
545 static void ru_shutdown(void* ru, grpc_error* error) {
546 if (GRPC_TRACE_FLAG_ENABLED(grpc_resource_quota_trace)) {
547 gpr_log(GPR_INFO, "RU shutdown %p", ru);
549 grpc_resource_user* resource_user = static_cast<grpc_resource_user*>(ru);
550 gpr_mu_lock(&resource_user->mu);
551 GRPC_CLOSURE_SCHED(resource_user->reclaimers[0], GRPC_ERROR_CANCELLED);
552 GRPC_CLOSURE_SCHED(resource_user->reclaimers[1], GRPC_ERROR_CANCELLED);
553 resource_user->reclaimers[0] = nullptr;
554 resource_user->reclaimers[1] = nullptr;
555 rulist_remove(resource_user, GRPC_RULIST_RECLAIMER_BENIGN);
556 rulist_remove(resource_user, GRPC_RULIST_RECLAIMER_DESTRUCTIVE);
557 if (resource_user->allocating) {
558 rq_step_sched(resource_user->resource_quota);
560 gpr_mu_unlock(&resource_user->mu);
563 static void ru_destroy(void* ru, grpc_error* error) {
564 grpc_resource_user* resource_user = static_cast<grpc_resource_user*>(ru);
565 GPR_ASSERT(gpr_atm_no_barrier_load(&resource_user->refs) == 0);
566 // Free all the remaining thread quota
567 grpc_resource_user_free_threads(resource_user,
568 static_cast<int>(gpr_atm_no_barrier_load(
569 &resource_user->num_threads_allocated)));
571 for (int i = 0; i < GRPC_RULIST_COUNT; i++) {
572 rulist_remove(resource_user, static_cast<grpc_rulist>(i));
574 GRPC_CLOSURE_SCHED(resource_user->reclaimers[0], GRPC_ERROR_CANCELLED);
575 GRPC_CLOSURE_SCHED(resource_user->reclaimers[1], GRPC_ERROR_CANCELLED);
576 if (resource_user->free_pool != 0) {
577 resource_user->resource_quota->free_pool += resource_user->free_pool;
578 rq_step_sched(resource_user->resource_quota);
580 grpc_resource_quota_unref_internal(resource_user->resource_quota);
581 gpr_mu_destroy(&resource_user->mu);
582 gpr_free(resource_user->name);
583 gpr_free(resource_user);
586 static void ru_allocated_slices(void* arg, grpc_error* error) {
587 grpc_resource_user_slice_allocator* slice_allocator =
588 static_cast<grpc_resource_user_slice_allocator*>(arg);
589 if (error == GRPC_ERROR_NONE) {
590 for (size_t i = 0; i < slice_allocator->count; i++) {
591 grpc_slice_buffer_add_indexed(
592 slice_allocator->dest, ru_slice_create(slice_allocator->resource_user,
593 slice_allocator->length));
596 GRPC_CLOSURE_RUN(&slice_allocator->on_done, GRPC_ERROR_REF(error));
599 /*******************************************************************************
600 * grpc_resource_quota internal implementation: quota manipulation under the
606 grpc_resource_quota* resource_quota;
607 grpc_closure closure;
610 static void rq_resize(void* args, grpc_error* error) {
611 rq_resize_args* a = static_cast<rq_resize_args*>(args);
612 int64_t delta = a->size - a->resource_quota->size;
613 a->resource_quota->size += delta;
614 a->resource_quota->free_pool += delta;
615 rq_update_estimate(a->resource_quota);
616 rq_step_sched(a->resource_quota);
617 grpc_resource_quota_unref_internal(a->resource_quota);
621 static void rq_reclamation_done(void* rq, grpc_error* error) {
622 grpc_resource_quota* resource_quota = static_cast<grpc_resource_quota*>(rq);
623 resource_quota->reclaiming = false;
624 rq_step_sched(resource_quota);
625 grpc_resource_quota_unref_internal(resource_quota);
628 /*******************************************************************************
629 * grpc_resource_quota api
633 grpc_resource_quota* grpc_resource_quota_create(const char* name) {
634 grpc_resource_quota* resource_quota =
635 static_cast<grpc_resource_quota*>(gpr_malloc(sizeof(*resource_quota)));
636 gpr_ref_init(&resource_quota->refs, 1);
637 resource_quota->combiner = grpc_combiner_create();
638 resource_quota->free_pool = INT64_MAX;
639 resource_quota->size = INT64_MAX;
640 resource_quota->used = 0;
641 gpr_atm_no_barrier_store(&resource_quota->last_size, GPR_ATM_MAX);
642 gpr_mu_init(&resource_quota->thread_count_mu);
643 resource_quota->max_threads = INT_MAX;
644 resource_quota->num_threads_allocated = 0;
645 resource_quota->step_scheduled = false;
646 resource_quota->reclaiming = false;
647 gpr_atm_no_barrier_store(&resource_quota->memory_usage_estimation, 0);
648 if (name != nullptr) {
649 resource_quota->name = gpr_strdup(name);
651 gpr_asprintf(&resource_quota->name, "anonymous_pool_%" PRIxPTR,
652 (intptr_t)resource_quota);
654 GRPC_CLOSURE_INIT(&resource_quota->rq_step_closure, rq_step, resource_quota,
655 grpc_combiner_finally_scheduler(resource_quota->combiner));
656 GRPC_CLOSURE_INIT(&resource_quota->rq_reclamation_done_closure,
657 rq_reclamation_done, resource_quota,
658 grpc_combiner_scheduler(resource_quota->combiner));
659 for (int i = 0; i < GRPC_RULIST_COUNT; i++) {
660 resource_quota->roots[i] = nullptr;
662 return resource_quota;
665 void grpc_resource_quota_unref_internal(grpc_resource_quota* resource_quota) {
666 if (gpr_unref(&resource_quota->refs)) {
667 // No outstanding thread quota
668 GPR_ASSERT(resource_quota->num_threads_allocated == 0);
669 GRPC_COMBINER_UNREF(resource_quota->combiner, "resource_quota");
670 gpr_free(resource_quota->name);
671 gpr_mu_destroy(&resource_quota->thread_count_mu);
672 gpr_free(resource_quota);
677 void grpc_resource_quota_unref(grpc_resource_quota* resource_quota) {
678 grpc_core::ExecCtx exec_ctx;
679 grpc_resource_quota_unref_internal(resource_quota);
682 grpc_resource_quota* grpc_resource_quota_ref_internal(
683 grpc_resource_quota* resource_quota) {
684 gpr_ref(&resource_quota->refs);
685 return resource_quota;
689 void grpc_resource_quota_ref(grpc_resource_quota* resource_quota) {
690 grpc_resource_quota_ref_internal(resource_quota);
693 double grpc_resource_quota_get_memory_pressure(
694 grpc_resource_quota* resource_quota) {
695 return (static_cast<double>(gpr_atm_no_barrier_load(
696 &resource_quota->memory_usage_estimation))) /
697 (static_cast<double>(MEMORY_USAGE_ESTIMATION_MAX));
701 void grpc_resource_quota_set_max_threads(grpc_resource_quota* resource_quota,
702 int new_max_threads) {
703 GPR_ASSERT(new_max_threads >= 0);
704 gpr_mu_lock(&resource_quota->thread_count_mu);
705 resource_quota->max_threads = new_max_threads;
706 gpr_mu_unlock(&resource_quota->thread_count_mu);
710 void grpc_resource_quota_resize(grpc_resource_quota* resource_quota,
712 grpc_core::ExecCtx exec_ctx;
713 rq_resize_args* a = static_cast<rq_resize_args*>(gpr_malloc(sizeof(*a)));
714 a->resource_quota = grpc_resource_quota_ref_internal(resource_quota);
715 a->size = static_cast<int64_t>(size);
716 gpr_atm_no_barrier_store(&resource_quota->last_size,
717 (gpr_atm)GPR_MIN((size_t)GPR_ATM_MAX, size));
718 GRPC_CLOSURE_INIT(&a->closure, rq_resize, a, grpc_schedule_on_exec_ctx);
719 GRPC_CLOSURE_SCHED(&a->closure, GRPC_ERROR_NONE);
722 size_t grpc_resource_quota_peek_size(grpc_resource_quota* resource_quota) {
723 return static_cast<size_t>(
724 gpr_atm_no_barrier_load(&resource_quota->last_size));
727 /*******************************************************************************
728 * grpc_resource_user channel args api
731 grpc_resource_quota* grpc_resource_quota_from_channel_args(
732 const grpc_channel_args* channel_args, bool create) {
733 for (size_t i = 0; i < channel_args->num_args; i++) {
734 if (0 == strcmp(channel_args->args[i].key, GRPC_ARG_RESOURCE_QUOTA)) {
735 if (channel_args->args[i].type == GRPC_ARG_POINTER) {
736 return grpc_resource_quota_ref_internal(
737 static_cast<grpc_resource_quota*>(
738 channel_args->args[i].value.pointer.p));
740 gpr_log(GPR_DEBUG, GRPC_ARG_RESOURCE_QUOTA " should be a pointer");
744 return create ? grpc_resource_quota_create(nullptr) : nullptr;
747 static void* rq_copy(void* rq) {
748 grpc_resource_quota_ref(static_cast<grpc_resource_quota*>(rq));
752 static void rq_destroy(void* rq) {
753 grpc_resource_quota_unref_internal(static_cast<grpc_resource_quota*>(rq));
756 static int rq_cmp(void* a, void* b) { return GPR_ICMP(a, b); }
758 const grpc_arg_pointer_vtable* grpc_resource_quota_arg_vtable(void) {
759 static const grpc_arg_pointer_vtable vtable = {rq_copy, rq_destroy, rq_cmp};
763 /*******************************************************************************
764 * grpc_resource_user api
767 grpc_resource_user* grpc_resource_user_create(
768 grpc_resource_quota* resource_quota, const char* name) {
769 grpc_resource_user* resource_user =
770 static_cast<grpc_resource_user*>(gpr_malloc(sizeof(*resource_user)));
771 resource_user->resource_quota =
772 grpc_resource_quota_ref_internal(resource_quota);
773 GRPC_CLOSURE_INIT(&resource_user->allocate_closure, &ru_allocate,
775 grpc_combiner_scheduler(resource_quota->combiner));
776 GRPC_CLOSURE_INIT(&resource_user->add_to_free_pool_closure,
777 &ru_add_to_free_pool, resource_user,
778 grpc_combiner_scheduler(resource_quota->combiner));
779 GRPC_CLOSURE_INIT(&resource_user->post_reclaimer_closure[0],
780 &ru_post_benign_reclaimer, resource_user,
781 grpc_combiner_scheduler(resource_quota->combiner));
782 GRPC_CLOSURE_INIT(&resource_user->post_reclaimer_closure[1],
783 &ru_post_destructive_reclaimer, resource_user,
784 grpc_combiner_scheduler(resource_quota->combiner));
785 GRPC_CLOSURE_INIT(&resource_user->destroy_closure, &ru_destroy, resource_user,
786 grpc_combiner_scheduler(resource_quota->combiner));
787 gpr_mu_init(&resource_user->mu);
788 gpr_atm_rel_store(&resource_user->refs, 1);
789 gpr_atm_rel_store(&resource_user->shutdown, 0);
790 resource_user->free_pool = 0;
791 grpc_closure_list_init(&resource_user->on_allocated);
792 resource_user->allocating = false;
793 resource_user->added_to_free_pool = false;
794 gpr_atm_no_barrier_store(&resource_user->num_threads_allocated, 0);
795 resource_user->reclaimers[0] = nullptr;
796 resource_user->reclaimers[1] = nullptr;
797 resource_user->new_reclaimers[0] = nullptr;
798 resource_user->new_reclaimers[1] = nullptr;
799 resource_user->outstanding_allocations = 0;
800 for (int i = 0; i < GRPC_RULIST_COUNT; i++) {
801 resource_user->links[i].next = resource_user->links[i].prev = nullptr;
803 if (name != nullptr) {
804 resource_user->name = gpr_strdup(name);
806 gpr_asprintf(&resource_user->name, "anonymous_resource_user_%" PRIxPTR,
807 (intptr_t)resource_user);
809 return resource_user;
812 grpc_resource_quota* grpc_resource_user_quota(
813 grpc_resource_user* resource_user) {
814 return resource_user->resource_quota;
817 static void ru_ref_by(grpc_resource_user* resource_user, gpr_atm amount) {
818 GPR_ASSERT(amount > 0);
819 GPR_ASSERT(gpr_atm_no_barrier_fetch_add(&resource_user->refs, amount) != 0);
822 static void ru_unref_by(grpc_resource_user* resource_user, gpr_atm amount) {
823 GPR_ASSERT(amount > 0);
824 gpr_atm old = gpr_atm_full_fetch_add(&resource_user->refs, -amount);
825 GPR_ASSERT(old >= amount);
827 GRPC_CLOSURE_SCHED(&resource_user->destroy_closure, GRPC_ERROR_NONE);
831 void grpc_resource_user_ref(grpc_resource_user* resource_user) {
832 ru_ref_by(resource_user, 1);
835 void grpc_resource_user_unref(grpc_resource_user* resource_user) {
836 ru_unref_by(resource_user, 1);
839 void grpc_resource_user_shutdown(grpc_resource_user* resource_user) {
840 if (gpr_atm_full_fetch_add(&resource_user->shutdown, 1) == 0) {
843 ru_shutdown, resource_user,
844 grpc_combiner_scheduler(resource_user->resource_quota->combiner)),
849 bool grpc_resource_user_allocate_threads(grpc_resource_user* resource_user,
851 GPR_ASSERT(thread_count >= 0);
852 bool is_success = false;
853 gpr_mu_lock(&resource_user->resource_quota->thread_count_mu);
854 grpc_resource_quota* rq = resource_user->resource_quota;
855 if (rq->num_threads_allocated + thread_count <= rq->max_threads) {
856 rq->num_threads_allocated += thread_count;
857 gpr_atm_no_barrier_fetch_add(&resource_user->num_threads_allocated,
861 gpr_mu_unlock(&resource_user->resource_quota->thread_count_mu);
865 void grpc_resource_user_free_threads(grpc_resource_user* resource_user,
867 GPR_ASSERT(thread_count >= 0);
868 gpr_mu_lock(&resource_user->resource_quota->thread_count_mu);
869 grpc_resource_quota* rq = resource_user->resource_quota;
870 rq->num_threads_allocated -= thread_count;
871 int old_count = static_cast<int>(gpr_atm_no_barrier_fetch_add(
872 &resource_user->num_threads_allocated, -thread_count));
873 if (old_count < thread_count || rq->num_threads_allocated < 0) {
875 "Releasing more threads (%d) than currently allocated (rq threads: "
876 "%d, ru threads: %d)",
877 thread_count, rq->num_threads_allocated + thread_count, old_count);
880 gpr_mu_unlock(&resource_user->resource_quota->thread_count_mu);
883 static void resource_user_alloc_locked(grpc_resource_user* resource_user,
885 grpc_closure* optional_on_done) {
886 ru_ref_by(resource_user, static_cast<gpr_atm>(size));
887 resource_user->free_pool -= static_cast<int64_t>(size);
888 if (GRPC_TRACE_FLAG_ENABLED(grpc_resource_quota_trace)) {
889 gpr_log(GPR_INFO, "RQ %s %s: alloc %" PRIdPTR "; free_pool -> %" PRId64,
890 resource_user->resource_quota->name, resource_user->name, size,
891 resource_user->free_pool);
893 if (resource_user->free_pool < 0) {
894 if (optional_on_done != nullptr) {
895 resource_user->outstanding_allocations += static_cast<int64_t>(size);
896 grpc_closure_list_append(&resource_user->on_allocated, optional_on_done,
899 if (!resource_user->allocating) {
900 resource_user->allocating = true;
901 GRPC_CLOSURE_SCHED(&resource_user->allocate_closure, GRPC_ERROR_NONE);
904 GRPC_CLOSURE_SCHED(optional_on_done, GRPC_ERROR_NONE);
908 bool grpc_resource_user_safe_alloc(grpc_resource_user* resource_user,
910 if (gpr_atm_no_barrier_load(&resource_user->shutdown)) return false;
911 gpr_mu_lock(&resource_user->mu);
912 grpc_resource_quota* resource_quota = resource_user->resource_quota;
915 gpr_atm used = gpr_atm_no_barrier_load(&resource_quota->used);
916 gpr_atm new_used = used + size;
917 if (static_cast<size_t>(new_used) >
918 grpc_resource_quota_peek_size(resource_quota)) {
919 gpr_mu_unlock(&resource_user->mu);
922 cas_success = gpr_atm_full_cas(&resource_quota->used, used, new_used);
923 } while (!cas_success);
924 resource_user_alloc_locked(resource_user, size, nullptr);
925 gpr_mu_unlock(&resource_user->mu);
929 void grpc_resource_user_alloc(grpc_resource_user* resource_user, size_t size,
930 grpc_closure* optional_on_done) {
931 // TODO(juanlishen): Maybe return immediately if shutting down. Deferring this
932 // because some tests become flaky after the change.
933 gpr_mu_lock(&resource_user->mu);
934 grpc_resource_quota* resource_quota = resource_user->resource_quota;
935 gpr_atm_no_barrier_fetch_add(&resource_quota->used, size);
936 resource_user_alloc_locked(resource_user, size, optional_on_done);
937 gpr_mu_unlock(&resource_user->mu);
940 void grpc_resource_user_free(grpc_resource_user* resource_user, size_t size) {
941 gpr_mu_lock(&resource_user->mu);
942 grpc_resource_quota* resource_quota = resource_user->resource_quota;
943 gpr_atm prior = gpr_atm_no_barrier_fetch_add(&resource_quota->used, -size);
944 GPR_ASSERT(prior >= static_cast<long>(size));
945 bool was_zero_or_negative = resource_user->free_pool <= 0;
946 resource_user->free_pool += static_cast<int64_t>(size);
947 if (GRPC_TRACE_FLAG_ENABLED(grpc_resource_quota_trace)) {
948 gpr_log(GPR_INFO, "RQ %s %s: free %" PRIdPTR "; free_pool -> %" PRId64,
949 resource_user->resource_quota->name, resource_user->name, size,
950 resource_user->free_pool);
952 bool is_bigger_than_zero = resource_user->free_pool > 0;
953 if (is_bigger_than_zero && was_zero_or_negative &&
954 !resource_user->added_to_free_pool) {
955 resource_user->added_to_free_pool = true;
956 GRPC_CLOSURE_SCHED(&resource_user->add_to_free_pool_closure,
959 gpr_mu_unlock(&resource_user->mu);
960 ru_unref_by(resource_user, static_cast<gpr_atm>(size));
963 void grpc_resource_user_post_reclaimer(grpc_resource_user* resource_user,
965 grpc_closure* closure) {
966 GPR_ASSERT(resource_user->new_reclaimers[destructive] == nullptr);
967 resource_user->new_reclaimers[destructive] = closure;
968 GRPC_CLOSURE_SCHED(&resource_user->post_reclaimer_closure[destructive],
972 void grpc_resource_user_finish_reclamation(grpc_resource_user* resource_user) {
973 if (GRPC_TRACE_FLAG_ENABLED(grpc_resource_quota_trace)) {
974 gpr_log(GPR_INFO, "RQ %s %s: reclamation complete",
975 resource_user->resource_quota->name, resource_user->name);
978 &resource_user->resource_quota->rq_reclamation_done_closure,
982 void grpc_resource_user_slice_allocator_init(
983 grpc_resource_user_slice_allocator* slice_allocator,
984 grpc_resource_user* resource_user, grpc_iomgr_cb_func cb, void* p) {
985 GRPC_CLOSURE_INIT(&slice_allocator->on_allocated, ru_allocated_slices,
986 slice_allocator, grpc_schedule_on_exec_ctx);
987 GRPC_CLOSURE_INIT(&slice_allocator->on_done, cb, p,
988 grpc_schedule_on_exec_ctx);
989 slice_allocator->resource_user = resource_user;
992 void grpc_resource_user_alloc_slices(
993 grpc_resource_user_slice_allocator* slice_allocator, size_t length,
994 size_t count, grpc_slice_buffer* dest) {
995 if (gpr_atm_no_barrier_load(&slice_allocator->resource_user->shutdown)) {
997 &slice_allocator->on_allocated,
998 GRPC_ERROR_CREATE_FROM_STATIC_STRING("Resource user shutdown"));
1001 slice_allocator->length = length;
1002 slice_allocator->count = count;
1003 slice_allocator->dest = dest;
1004 grpc_resource_user_alloc(slice_allocator->resource_user, count * length,
1005 &slice_allocator->on_allocated);