3 * Copyright 2017 gRPC authors.
5 * Licensed under the Apache License, Version 2.0 (the "License");
6 * you may not use this file except in compliance with the License.
7 * You may obtain a copy of the License at
9 * http://www.apache.org/licenses/LICENSE-2.0
11 * Unless required by applicable law or agreed to in writing, software
12 * distributed under the License is distributed on an "AS IS" BASIS,
13 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14 * See the License for the specific language governing permissions and
15 * limitations under the License.
19 #include <grpc/support/port_platform.h>
21 #include "src/core/lib/iomgr/port.h"
23 #include <grpc/support/log.h>
25 /* This polling engine is only relevant on linux kernels supporting epoll
26 epoll_create() or epoll_create1() */
27 #ifdef GRPC_LINUX_EPOLL
28 #include "src/core/lib/iomgr/ev_epoll1_linux.h"
37 #include <sys/epoll.h>
38 #include <sys/socket.h>
41 #include <grpc/support/alloc.h>
42 #include <grpc/support/cpu.h>
43 #include <grpc/support/string_util.h>
45 #include "src/core/lib/debug/stats.h"
46 #include "src/core/lib/gpr/string.h"
47 #include "src/core/lib/gpr/tls.h"
48 #include "src/core/lib/gpr/useful.h"
49 #include "src/core/lib/gprpp/manual_constructor.h"
50 #include "src/core/lib/iomgr/block_annotate.h"
51 #include "src/core/lib/iomgr/ev_posix.h"
52 #include "src/core/lib/iomgr/iomgr_internal.h"
53 #include "src/core/lib/iomgr/lockfree_event.h"
54 #include "src/core/lib/iomgr/wakeup_fd_posix.h"
55 #include "src/core/lib/profiling/timers.h"
57 static grpc_wakeup_fd global_wakeup_fd;
59 /*******************************************************************************
60 * Singleton epoll set related fields
63 #define MAX_EPOLL_EVENTS 100
64 #define MAX_EPOLL_EVENTS_HANDLED_PER_ITERATION 1
66 /* NOTE ON SYNCHRONIZATION:
67 * - Fields in this struct are only modified by the designated poller. Hence
68 * there is no need for any locks to protect the struct.
69 * - num_events and cursor fields have to be of atomic type to provide memory
70 * visibility guarantees only. i.e In case of multiple pollers, the designated
71 * polling thread keeps changing; the thread that wrote these values may be
72 * different from the thread reading the values
74 typedef struct epoll_set {
77 /* The epoll_events after the last call to epoll_wait() */
78 struct epoll_event events[MAX_EPOLL_EVENTS];
80 /* The number of epoll_events after the last call to epoll_wait() */
83 /* Index of the first event in epoll_events that has to be processed. This
84 * field is only valid if num_events > 0 */
88 /* The global singleton epoll set */
89 static epoll_set g_epoll_set;
91 static int epoll_create_and_cloexec() {
92 #ifdef GRPC_LINUX_EPOLL_CREATE1
93 int fd = epoll_create1(EPOLL_CLOEXEC);
95 gpr_log(GPR_ERROR, "epoll_create1 unavailable");
98 int fd = epoll_create(MAX_EPOLL_EVENTS);
100 gpr_log(GPR_ERROR, "epoll_create unavailable");
101 } else if (fcntl(fd, F_SETFD, FD_CLOEXEC) != 0) {
102 gpr_log(GPR_ERROR, "fcntl following epoll_create failed");
109 /* Must be called *only* once */
110 static bool epoll_set_init() {
111 g_epoll_set.epfd = epoll_create_and_cloexec();
112 if (g_epoll_set.epfd < 0) {
116 gpr_log(GPR_INFO, "grpc epoll fd: %d", g_epoll_set.epfd);
117 gpr_atm_no_barrier_store(&g_epoll_set.num_events, 0);
118 gpr_atm_no_barrier_store(&g_epoll_set.cursor, 0);
122 /* epoll_set_init() MUST be called before calling this. */
123 static void epoll_set_shutdown() {
124 if (g_epoll_set.epfd >= 0) {
125 close(g_epoll_set.epfd);
126 g_epoll_set.epfd = -1;
130 /*******************************************************************************
134 /* Only used when GRPC_ENABLE_FORK_SUPPORT=1 */
135 struct grpc_fork_fd_list {
144 grpc_core::ManualConstructor<grpc_core::LockfreeEvent> read_closure;
145 grpc_core::ManualConstructor<grpc_core::LockfreeEvent> write_closure;
146 grpc_core::ManualConstructor<grpc_core::LockfreeEvent> error_closure;
148 struct grpc_fd* freelist_next;
150 grpc_iomgr_object iomgr_object;
152 /* Only used when GRPC_ENABLE_FORK_SUPPORT=1 */
153 grpc_fork_fd_list* fork_fd_list;
156 static void fd_global_init(void);
157 static void fd_global_shutdown(void);
159 /*******************************************************************************
160 * Pollset Declarations
163 typedef enum { UNKICKED, KICKED, DESIGNATED_POLLER } kick_state;
165 static const char* kick_state_string(kick_state st) {
171 case DESIGNATED_POLLER:
172 return "DESIGNATED_POLLER";
174 GPR_UNREACHABLE_CODE(return "UNKNOWN");
177 struct grpc_pollset_worker {
179 int kick_state_mutator; // which line of code last changed kick state
181 grpc_pollset_worker* next;
182 grpc_pollset_worker* prev;
184 grpc_closure_list schedule_on_end_work;
187 #define SET_KICK_STATE(worker, kick_state) \
189 (worker)->state = (kick_state); \
190 (worker)->kick_state_mutator = __LINE__; \
193 #define MAX_NEIGHBORHOODS 1024
195 typedef struct pollset_neighborhood {
197 char pad[GPR_CACHELINE_SIZE];
200 grpc_pollset* active_root;
203 } pollset_neighborhood;
205 struct grpc_pollset {
207 pollset_neighborhood* neighborhood;
208 bool reassigning_neighborhood;
209 grpc_pollset_worker* root_worker;
210 bool kicked_without_poller;
212 /* Set to true if the pollset is observed to have no workers available to
215 bool shutting_down; /* Is the pollset shutting down ? */
216 grpc_closure* shutdown_closure; /* Called after shutdown is complete */
218 /* Number of workers who are *about-to* attach themselves to the pollset
226 /*******************************************************************************
227 * Pollset-set Declarations
230 struct grpc_pollset_set {
234 /*******************************************************************************
238 static bool append_error(grpc_error** composite, grpc_error* error,
240 if (error == GRPC_ERROR_NONE) return true;
241 if (*composite == GRPC_ERROR_NONE) {
242 *composite = GRPC_ERROR_CREATE_FROM_COPIED_STRING(desc);
244 *composite = grpc_error_add_child(*composite, error);
248 /*******************************************************************************
252 /* We need to keep a freelist not because of any concerns of malloc performance
253 * but instead so that implementations with multiple threads in (for example)
254 * epoll_wait deal with the race between pollset removal and incoming poll
257 * The problem is that the poller ultimately holds a reference to this
258 * object, so it is very difficult to know when is safe to free it, at least
259 * without some expensive synchronization.
261 * If we keep the object freelisted, in the worst case losing this race just
262 * becomes a spurious read notification on a reused fd.
265 /* The alarm system needs to be able to wakeup 'some poller' sometimes
266 * (specifically when a new alarm needs to be triggered earlier than the next
267 * alarm 'epoch'). This wakeup_fd gives us something to alert on when such a
270 static grpc_fd* fd_freelist = nullptr;
271 static gpr_mu fd_freelist_mu;
273 /* Only used when GRPC_ENABLE_FORK_SUPPORT=1 */
274 static grpc_fd* fork_fd_list_head = nullptr;
275 static gpr_mu fork_fd_list_mu;
277 static void fd_global_init(void) { gpr_mu_init(&fd_freelist_mu); }
279 static void fd_global_shutdown(void) {
280 // TODO(guantaol): We don't have a reasonable explanation about this
281 // lock()/unlock() pattern. It can be a valid barrier if there is at most one
282 // pending lock() at this point. Otherwise, there is still a possibility of
283 // use-after-free race. Need to reason about the code and/or clean it up.
284 gpr_mu_lock(&fd_freelist_mu);
285 gpr_mu_unlock(&fd_freelist_mu);
286 while (fd_freelist != nullptr) {
287 grpc_fd* fd = fd_freelist;
288 fd_freelist = fd_freelist->freelist_next;
291 gpr_mu_destroy(&fd_freelist_mu);
294 static void fork_fd_list_add_grpc_fd(grpc_fd* fd) {
295 if (grpc_core::Fork::Enabled()) {
296 gpr_mu_lock(&fork_fd_list_mu);
298 static_cast<grpc_fork_fd_list*>(gpr_malloc(sizeof(grpc_fork_fd_list)));
299 fd->fork_fd_list->next = fork_fd_list_head;
300 fd->fork_fd_list->prev = nullptr;
301 if (fork_fd_list_head != nullptr) {
302 fork_fd_list_head->fork_fd_list->prev = fd;
304 fork_fd_list_head = fd;
305 gpr_mu_unlock(&fork_fd_list_mu);
309 static void fork_fd_list_remove_grpc_fd(grpc_fd* fd) {
310 if (grpc_core::Fork::Enabled()) {
311 gpr_mu_lock(&fork_fd_list_mu);
312 if (fork_fd_list_head == fd) {
313 fork_fd_list_head = fd->fork_fd_list->next;
315 if (fd->fork_fd_list->prev != nullptr) {
316 fd->fork_fd_list->prev->fork_fd_list->next = fd->fork_fd_list->next;
318 if (fd->fork_fd_list->next != nullptr) {
319 fd->fork_fd_list->next->fork_fd_list->prev = fd->fork_fd_list->prev;
321 gpr_free(fd->fork_fd_list);
322 gpr_mu_unlock(&fork_fd_list_mu);
326 static grpc_fd* fd_create(int fd, const char* name, bool track_err) {
327 grpc_fd* new_fd = nullptr;
329 gpr_mu_lock(&fd_freelist_mu);
330 if (fd_freelist != nullptr) {
331 new_fd = fd_freelist;
332 fd_freelist = fd_freelist->freelist_next;
334 gpr_mu_unlock(&fd_freelist_mu);
336 if (new_fd == nullptr) {
337 new_fd = static_cast<grpc_fd*>(gpr_malloc(sizeof(grpc_fd)));
338 new_fd->read_closure.Init();
339 new_fd->write_closure.Init();
340 new_fd->error_closure.Init();
343 new_fd->read_closure->InitEvent();
344 new_fd->write_closure->InitEvent();
345 new_fd->error_closure->InitEvent();
347 new_fd->freelist_next = nullptr;
350 gpr_asprintf(&fd_name, "%s fd=%d", name, fd);
351 grpc_iomgr_register_object(&new_fd->iomgr_object, fd_name);
352 fork_fd_list_add_grpc_fd(new_fd);
354 if (GRPC_TRACE_FLAG_ENABLED(grpc_trace_fd_refcount)) {
355 gpr_log(GPR_DEBUG, "FD %d %p create %s", fd, new_fd, fd_name);
360 struct epoll_event ev;
361 ev.events = static_cast<uint32_t>(EPOLLIN | EPOLLOUT | EPOLLET);
362 /* Use the least significant bit of ev.data.ptr to store track_err. We expect
363 * the addresses to be word aligned. We need to store track_err to avoid
364 * synchronization issues when accessing it after receiving an event.
365 * Accessing fd would be a data race there because the fd might have been
366 * returned to the free list at that point. */
367 ev.data.ptr = reinterpret_cast<void*>(reinterpret_cast<intptr_t>(new_fd) |
368 (track_err ? 1 : 0));
369 if (epoll_ctl(g_epoll_set.epfd, EPOLL_CTL_ADD, fd, &ev) != 0) {
370 gpr_log(GPR_ERROR, "epoll_ctl failed: %s", strerror(errno));
376 static int fd_wrapped_fd(grpc_fd* fd) { return fd->fd; }
378 /* if 'releasing_fd' is true, it means that we are going to detach the internal
379 * fd from grpc_fd structure (i.e which means we should not be calling
380 * shutdown() syscall on that fd) */
381 static void fd_shutdown_internal(grpc_fd* fd, grpc_error* why,
383 if (fd->read_closure->SetShutdown(GRPC_ERROR_REF(why))) {
385 shutdown(fd->fd, SHUT_RDWR);
387 /* we need a dummy event for earlier linux versions. */
388 epoll_event dummy_event;
389 if (epoll_ctl(g_epoll_set.epfd, EPOLL_CTL_DEL, fd->fd, &dummy_event) !=
391 gpr_log(GPR_ERROR, "epoll_ctl failed: %s", strerror(errno));
394 fd->write_closure->SetShutdown(GRPC_ERROR_REF(why));
395 fd->error_closure->SetShutdown(GRPC_ERROR_REF(why));
397 GRPC_ERROR_UNREF(why);
400 /* Might be called multiple times */
401 static void fd_shutdown(grpc_fd* fd, grpc_error* why) {
402 fd_shutdown_internal(fd, why, false);
405 static void fd_orphan(grpc_fd* fd, grpc_closure* on_done, int* release_fd,
406 const char* reason) {
407 grpc_error* error = GRPC_ERROR_NONE;
408 bool is_release_fd = (release_fd != nullptr);
410 if (!fd->read_closure->IsShutdown()) {
411 fd_shutdown_internal(fd, GRPC_ERROR_CREATE_FROM_COPIED_STRING(reason),
415 /* If release_fd is not NULL, we should be relinquishing control of the file
416 descriptor fd->fd (but we still own the grpc_fd structure). */
418 *release_fd = fd->fd;
423 GRPC_CLOSURE_SCHED(on_done, GRPC_ERROR_REF(error));
425 grpc_iomgr_unregister_object(&fd->iomgr_object);
426 fork_fd_list_remove_grpc_fd(fd);
427 fd->read_closure->DestroyEvent();
428 fd->write_closure->DestroyEvent();
429 fd->error_closure->DestroyEvent();
431 gpr_mu_lock(&fd_freelist_mu);
432 fd->freelist_next = fd_freelist;
434 gpr_mu_unlock(&fd_freelist_mu);
437 static bool fd_is_shutdown(grpc_fd* fd) {
438 return fd->read_closure->IsShutdown();
441 static void fd_notify_on_read(grpc_fd* fd, grpc_closure* closure) {
442 fd->read_closure->NotifyOn(closure);
445 static void fd_notify_on_write(grpc_fd* fd, grpc_closure* closure) {
446 fd->write_closure->NotifyOn(closure);
449 static void fd_notify_on_error(grpc_fd* fd, grpc_closure* closure) {
450 fd->error_closure->NotifyOn(closure);
453 static void fd_become_readable(grpc_fd* fd) { fd->read_closure->SetReady(); }
455 static void fd_become_writable(grpc_fd* fd) { fd->write_closure->SetReady(); }
457 static void fd_has_errors(grpc_fd* fd) { fd->error_closure->SetReady(); }
459 /*******************************************************************************
460 * Pollset Definitions
463 GPR_TLS_DECL(g_current_thread_pollset);
464 GPR_TLS_DECL(g_current_thread_worker);
466 /* The designated poller */
467 static gpr_atm g_active_poller;
469 static pollset_neighborhood* g_neighborhoods;
470 static size_t g_num_neighborhoods;
472 /* Return true if first in list */
473 static bool worker_insert(grpc_pollset* pollset, grpc_pollset_worker* worker) {
474 if (pollset->root_worker == nullptr) {
475 pollset->root_worker = worker;
476 worker->next = worker->prev = worker;
479 worker->next = pollset->root_worker;
480 worker->prev = worker->next->prev;
481 worker->next->prev = worker;
482 worker->prev->next = worker;
487 /* Return true if last in list */
488 typedef enum { EMPTIED, NEW_ROOT, REMOVED } worker_remove_result;
490 static worker_remove_result worker_remove(grpc_pollset* pollset,
491 grpc_pollset_worker* worker) {
492 if (worker == pollset->root_worker) {
493 if (worker == worker->next) {
494 pollset->root_worker = nullptr;
497 pollset->root_worker = worker->next;
498 worker->prev->next = worker->next;
499 worker->next->prev = worker->prev;
503 worker->prev->next = worker->next;
504 worker->next->prev = worker->prev;
509 static size_t choose_neighborhood(void) {
510 return static_cast<size_t>(gpr_cpu_current_cpu()) % g_num_neighborhoods;
513 static grpc_error* pollset_global_init(void) {
514 gpr_tls_init(&g_current_thread_pollset);
515 gpr_tls_init(&g_current_thread_worker);
516 gpr_atm_no_barrier_store(&g_active_poller, 0);
517 global_wakeup_fd.read_fd = -1;
518 grpc_error* err = grpc_wakeup_fd_init(&global_wakeup_fd);
519 if (err != GRPC_ERROR_NONE) return err;
520 struct epoll_event ev;
521 ev.events = static_cast<uint32_t>(EPOLLIN | EPOLLET);
522 ev.data.ptr = &global_wakeup_fd;
523 if (epoll_ctl(g_epoll_set.epfd, EPOLL_CTL_ADD, global_wakeup_fd.read_fd,
525 return GRPC_OS_ERROR(errno, "epoll_ctl");
527 g_num_neighborhoods = GPR_CLAMP(gpr_cpu_num_cores(), 1, MAX_NEIGHBORHOODS);
528 g_neighborhoods = static_cast<pollset_neighborhood*>(
529 gpr_zalloc(sizeof(*g_neighborhoods) * g_num_neighborhoods));
530 for (size_t i = 0; i < g_num_neighborhoods; i++) {
531 gpr_mu_init(&g_neighborhoods[i].mu);
533 return GRPC_ERROR_NONE;
536 static void pollset_global_shutdown(void) {
537 gpr_tls_destroy(&g_current_thread_pollset);
538 gpr_tls_destroy(&g_current_thread_worker);
539 if (global_wakeup_fd.read_fd != -1) grpc_wakeup_fd_destroy(&global_wakeup_fd);
540 for (size_t i = 0; i < g_num_neighborhoods; i++) {
541 gpr_mu_destroy(&g_neighborhoods[i].mu);
543 gpr_free(g_neighborhoods);
546 static void pollset_init(grpc_pollset* pollset, gpr_mu** mu) {
547 gpr_mu_init(&pollset->mu);
549 pollset->neighborhood = &g_neighborhoods[choose_neighborhood()];
550 pollset->reassigning_neighborhood = false;
551 pollset->root_worker = nullptr;
552 pollset->kicked_without_poller = false;
553 pollset->seen_inactive = true;
554 pollset->shutting_down = false;
555 pollset->shutdown_closure = nullptr;
556 pollset->begin_refs = 0;
557 pollset->next = pollset->prev = nullptr;
560 static void pollset_destroy(grpc_pollset* pollset) {
561 gpr_mu_lock(&pollset->mu);
562 if (!pollset->seen_inactive) {
563 pollset_neighborhood* neighborhood = pollset->neighborhood;
564 gpr_mu_unlock(&pollset->mu);
565 retry_lock_neighborhood:
566 gpr_mu_lock(&neighborhood->mu);
567 gpr_mu_lock(&pollset->mu);
568 if (!pollset->seen_inactive) {
569 if (pollset->neighborhood != neighborhood) {
570 gpr_mu_unlock(&neighborhood->mu);
571 neighborhood = pollset->neighborhood;
572 gpr_mu_unlock(&pollset->mu);
573 goto retry_lock_neighborhood;
575 pollset->prev->next = pollset->next;
576 pollset->next->prev = pollset->prev;
577 if (pollset == pollset->neighborhood->active_root) {
578 pollset->neighborhood->active_root =
579 pollset->next == pollset ? nullptr : pollset->next;
582 gpr_mu_unlock(&pollset->neighborhood->mu);
584 gpr_mu_unlock(&pollset->mu);
585 gpr_mu_destroy(&pollset->mu);
588 static grpc_error* pollset_kick_all(grpc_pollset* pollset) {
589 GPR_TIMER_SCOPE("pollset_kick_all", 0);
590 grpc_error* error = GRPC_ERROR_NONE;
591 if (pollset->root_worker != nullptr) {
592 grpc_pollset_worker* worker = pollset->root_worker;
594 GRPC_STATS_INC_POLLSET_KICK();
595 switch (worker->state) {
597 GRPC_STATS_INC_POLLSET_KICKED_AGAIN();
600 SET_KICK_STATE(worker, KICKED);
601 if (worker->initialized_cv) {
602 GRPC_STATS_INC_POLLSET_KICK_WAKEUP_CV();
603 gpr_cv_signal(&worker->cv);
606 case DESIGNATED_POLLER:
607 GRPC_STATS_INC_POLLSET_KICK_WAKEUP_FD();
608 SET_KICK_STATE(worker, KICKED);
609 append_error(&error, grpc_wakeup_fd_wakeup(&global_wakeup_fd),
614 worker = worker->next;
615 } while (worker != pollset->root_worker);
617 // TODO: sreek. Check if we need to set 'kicked_without_poller' to true here
622 static void pollset_maybe_finish_shutdown(grpc_pollset* pollset) {
623 if (pollset->shutdown_closure != nullptr && pollset->root_worker == nullptr &&
624 pollset->begin_refs == 0) {
625 GPR_TIMER_MARK("pollset_finish_shutdown", 0);
626 GRPC_CLOSURE_SCHED(pollset->shutdown_closure, GRPC_ERROR_NONE);
627 pollset->shutdown_closure = nullptr;
631 static void pollset_shutdown(grpc_pollset* pollset, grpc_closure* closure) {
632 GPR_TIMER_SCOPE("pollset_shutdown", 0);
633 GPR_ASSERT(pollset->shutdown_closure == nullptr);
634 GPR_ASSERT(!pollset->shutting_down);
635 pollset->shutdown_closure = closure;
636 pollset->shutting_down = true;
637 GRPC_LOG_IF_ERROR("pollset_shutdown", pollset_kick_all(pollset));
638 pollset_maybe_finish_shutdown(pollset);
641 static int poll_deadline_to_millis_timeout(grpc_millis millis) {
642 if (millis == GRPC_MILLIS_INF_FUTURE) return -1;
643 grpc_millis delta = millis - grpc_core::ExecCtx::Get()->Now();
644 if (delta > INT_MAX) {
646 } else if (delta < 0) {
649 return static_cast<int>(delta);
653 /* Process the epoll events found by do_epoll_wait() function.
654 - g_epoll_set.cursor points to the index of the first event to be processed
655 - This function then processes up-to MAX_EPOLL_EVENTS_PER_ITERATION and
656 updates the g_epoll_set.cursor
658 NOTE ON SYNCRHONIZATION: Similar to do_epoll_wait(), this function is only
659 called by g_active_poller thread. So there is no need for synchronization
660 when accessing fields in g_epoll_set */
661 static grpc_error* process_epoll_events(grpc_pollset* pollset) {
662 GPR_TIMER_SCOPE("process_epoll_events", 0);
664 static const char* err_desc = "process_events";
665 grpc_error* error = GRPC_ERROR_NONE;
666 long num_events = gpr_atm_acq_load(&g_epoll_set.num_events);
667 long cursor = gpr_atm_acq_load(&g_epoll_set.cursor);
669 (idx < MAX_EPOLL_EVENTS_HANDLED_PER_ITERATION) && cursor != num_events;
672 struct epoll_event* ev = &g_epoll_set.events[c];
673 void* data_ptr = ev->data.ptr;
675 if (data_ptr == &global_wakeup_fd) {
676 append_error(&error, grpc_wakeup_fd_consume_wakeup(&global_wakeup_fd),
679 grpc_fd* fd = reinterpret_cast<grpc_fd*>(
680 reinterpret_cast<intptr_t>(data_ptr) & ~static_cast<intptr_t>(1));
682 reinterpret_cast<intptr_t>(data_ptr) & static_cast<intptr_t>(1);
683 bool cancel = (ev->events & EPOLLHUP) != 0;
684 bool error = (ev->events & EPOLLERR) != 0;
685 bool read_ev = (ev->events & (EPOLLIN | EPOLLPRI)) != 0;
686 bool write_ev = (ev->events & EPOLLOUT) != 0;
687 bool err_fallback = error && !track_err;
689 if (error && !err_fallback) {
693 if (read_ev || cancel || err_fallback) {
694 fd_become_readable(fd);
697 if (write_ev || cancel || err_fallback) {
698 fd_become_writable(fd);
702 gpr_atm_rel_store(&g_epoll_set.cursor, cursor);
706 /* Do epoll_wait and store the events in g_epoll_set.events field. This does not
707 "process" any of the events yet; that is done in process_epoll_events().
708 *See process_epoll_events() function for more details.
710 NOTE ON SYNCHRONIZATION: At any point of time, only the g_active_poller
711 (i.e the designated poller thread) will be calling this function. So there is
712 no need for any synchronization when accesing fields in g_epoll_set */
713 static grpc_error* do_epoll_wait(grpc_pollset* ps, grpc_millis deadline) {
714 GPR_TIMER_SCOPE("do_epoll_wait", 0);
717 int timeout = poll_deadline_to_millis_timeout(deadline);
719 GRPC_SCHEDULING_START_BLOCKING_REGION;
722 GRPC_STATS_INC_SYSCALL_POLL();
723 r = epoll_wait(g_epoll_set.epfd, g_epoll_set.events, MAX_EPOLL_EVENTS,
725 } while (r < 0 && errno == EINTR);
727 GRPC_SCHEDULING_END_BLOCKING_REGION;
730 if (r < 0) return GRPC_OS_ERROR(errno, "epoll_wait");
732 GRPC_STATS_INC_POLL_EVENTS_RETURNED(r);
734 if (GRPC_TRACE_FLAG_ENABLED(grpc_polling_trace)) {
735 gpr_log(GPR_INFO, "ps: %p poll got %d events", ps, r);
738 gpr_atm_rel_store(&g_epoll_set.num_events, r);
739 gpr_atm_rel_store(&g_epoll_set.cursor, 0);
741 return GRPC_ERROR_NONE;
744 static bool begin_worker(grpc_pollset* pollset, grpc_pollset_worker* worker,
745 grpc_pollset_worker** worker_hdl,
746 grpc_millis deadline) {
747 GPR_TIMER_SCOPE("begin_worker", 0);
748 if (worker_hdl != nullptr) *worker_hdl = worker;
749 worker->initialized_cv = false;
750 SET_KICK_STATE(worker, UNKICKED);
751 worker->schedule_on_end_work = (grpc_closure_list)GRPC_CLOSURE_LIST_INIT;
752 pollset->begin_refs++;
754 if (GRPC_TRACE_FLAG_ENABLED(grpc_polling_trace)) {
755 gpr_log(GPR_INFO, "PS:%p BEGIN_STARTS:%p", pollset, worker);
758 if (pollset->seen_inactive) {
759 // pollset has been observed to be inactive, we need to move back to the
761 bool is_reassigning = false;
762 if (!pollset->reassigning_neighborhood) {
763 is_reassigning = true;
764 pollset->reassigning_neighborhood = true;
765 pollset->neighborhood = &g_neighborhoods[choose_neighborhood()];
767 pollset_neighborhood* neighborhood = pollset->neighborhood;
768 gpr_mu_unlock(&pollset->mu);
769 // pollset unlocked: state may change (even worker->kick_state)
770 retry_lock_neighborhood:
771 gpr_mu_lock(&neighborhood->mu);
772 gpr_mu_lock(&pollset->mu);
773 if (GRPC_TRACE_FLAG_ENABLED(grpc_polling_trace)) {
774 gpr_log(GPR_INFO, "PS:%p BEGIN_REORG:%p kick_state=%s is_reassigning=%d",
775 pollset, worker, kick_state_string(worker->state),
778 if (pollset->seen_inactive) {
779 if (neighborhood != pollset->neighborhood) {
780 gpr_mu_unlock(&neighborhood->mu);
781 neighborhood = pollset->neighborhood;
782 gpr_mu_unlock(&pollset->mu);
783 goto retry_lock_neighborhood;
786 /* In the brief time we released the pollset locks above, the worker MAY
787 have been kicked. In this case, the worker should get out of this
788 pollset ASAP and hence this should neither add the pollset to
789 neighborhood nor mark the pollset as active.
791 On a side note, the only way a worker's kick state could have changed
792 at this point is if it were "kicked specifically". Since the worker has
793 not added itself to the pollset yet (by calling worker_insert()), it is
794 not visible in the "kick any" path yet */
795 if (worker->state == UNKICKED) {
796 pollset->seen_inactive = false;
797 if (neighborhood->active_root == nullptr) {
798 neighborhood->active_root = pollset->next = pollset->prev = pollset;
799 /* Make this the designated poller if there isn't one already */
800 if (worker->state == UNKICKED &&
801 gpr_atm_no_barrier_cas(&g_active_poller, 0, (gpr_atm)worker)) {
802 SET_KICK_STATE(worker, DESIGNATED_POLLER);
805 pollset->next = neighborhood->active_root;
806 pollset->prev = pollset->next->prev;
807 pollset->next->prev = pollset->prev->next = pollset;
811 if (is_reassigning) {
812 GPR_ASSERT(pollset->reassigning_neighborhood);
813 pollset->reassigning_neighborhood = false;
815 gpr_mu_unlock(&neighborhood->mu);
818 worker_insert(pollset, worker);
819 pollset->begin_refs--;
820 if (worker->state == UNKICKED && !pollset->kicked_without_poller) {
821 GPR_ASSERT(gpr_atm_no_barrier_load(&g_active_poller) != (gpr_atm)worker);
822 worker->initialized_cv = true;
823 gpr_cv_init(&worker->cv);
824 while (worker->state == UNKICKED && !pollset->shutting_down) {
825 if (GRPC_TRACE_FLAG_ENABLED(grpc_polling_trace)) {
826 gpr_log(GPR_INFO, "PS:%p BEGIN_WAIT:%p kick_state=%s shutdown=%d",
827 pollset, worker, kick_state_string(worker->state),
828 pollset->shutting_down);
831 if (gpr_cv_wait(&worker->cv, &pollset->mu,
832 grpc_millis_to_timespec(deadline, GPR_CLOCK_MONOTONIC)) &&
833 worker->state == UNKICKED) {
834 /* If gpr_cv_wait returns true (i.e a timeout), pretend that the worker
836 SET_KICK_STATE(worker, KICKED);
839 grpc_core::ExecCtx::Get()->InvalidateNow();
842 if (GRPC_TRACE_FLAG_ENABLED(grpc_polling_trace)) {
844 "PS:%p BEGIN_DONE:%p kick_state=%s shutdown=%d "
845 "kicked_without_poller: %d",
846 pollset, worker, kick_state_string(worker->state),
847 pollset->shutting_down, pollset->kicked_without_poller);
850 /* We release pollset lock in this function at a couple of places:
851 * 1. Briefly when assigning pollset to a neighborhood
852 * 2. When doing gpr_cv_wait()
853 * It is possible that 'kicked_without_poller' was set to true during (1) and
854 * 'shutting_down' is set to true during (1) or (2). If either of them is
855 * true, this worker cannot do polling */
856 /* TODO(sreek): Perhaps there is a better way to handle kicked_without_poller
857 * case; especially when the worker is the DESIGNATED_POLLER */
859 if (pollset->kicked_without_poller) {
860 pollset->kicked_without_poller = false;
864 return worker->state == DESIGNATED_POLLER && !pollset->shutting_down;
867 static bool check_neighborhood_for_available_poller(
868 pollset_neighborhood* neighborhood) {
869 GPR_TIMER_SCOPE("check_neighborhood_for_available_poller", 0);
870 bool found_worker = false;
872 grpc_pollset* inspect = neighborhood->active_root;
873 if (inspect == nullptr) {
876 gpr_mu_lock(&inspect->mu);
877 GPR_ASSERT(!inspect->seen_inactive);
878 grpc_pollset_worker* inspect_worker = inspect->root_worker;
879 if (inspect_worker != nullptr) {
881 switch (inspect_worker->state) {
883 if (gpr_atm_no_barrier_cas(&g_active_poller, 0,
884 (gpr_atm)inspect_worker)) {
885 if (GRPC_TRACE_FLAG_ENABLED(grpc_polling_trace)) {
886 gpr_log(GPR_INFO, " .. choose next poller to be %p",
889 SET_KICK_STATE(inspect_worker, DESIGNATED_POLLER);
890 if (inspect_worker->initialized_cv) {
891 GPR_TIMER_MARK("signal worker", 0);
892 GRPC_STATS_INC_POLLSET_KICK_WAKEUP_CV();
893 gpr_cv_signal(&inspect_worker->cv);
896 if (GRPC_TRACE_FLAG_ENABLED(grpc_polling_trace)) {
897 gpr_log(GPR_INFO, " .. beaten to choose next poller");
900 // even if we didn't win the cas, there's a worker, we can stop
905 case DESIGNATED_POLLER:
906 found_worker = true; // ok, so someone else found the worker, but
910 inspect_worker = inspect_worker->next;
911 } while (!found_worker && inspect_worker != inspect->root_worker);
914 if (GRPC_TRACE_FLAG_ENABLED(grpc_polling_trace)) {
915 gpr_log(GPR_INFO, " .. mark pollset %p inactive", inspect);
917 inspect->seen_inactive = true;
918 if (inspect == neighborhood->active_root) {
919 neighborhood->active_root =
920 inspect->next == inspect ? nullptr : inspect->next;
922 inspect->next->prev = inspect->prev;
923 inspect->prev->next = inspect->next;
924 inspect->next = inspect->prev = nullptr;
926 gpr_mu_unlock(&inspect->mu);
927 } while (!found_worker);
931 static void end_worker(grpc_pollset* pollset, grpc_pollset_worker* worker,
932 grpc_pollset_worker** worker_hdl) {
933 GPR_TIMER_SCOPE("end_worker", 0);
934 if (GRPC_TRACE_FLAG_ENABLED(grpc_polling_trace)) {
935 gpr_log(GPR_INFO, "PS:%p END_WORKER:%p", pollset, worker);
937 if (worker_hdl != nullptr) *worker_hdl = nullptr;
938 /* Make sure we appear kicked */
939 SET_KICK_STATE(worker, KICKED);
940 grpc_closure_list_move(&worker->schedule_on_end_work,
941 grpc_core::ExecCtx::Get()->closure_list());
942 if (gpr_atm_no_barrier_load(&g_active_poller) == (gpr_atm)worker) {
943 if (worker->next != worker && worker->next->state == UNKICKED) {
944 if (GRPC_TRACE_FLAG_ENABLED(grpc_polling_trace)) {
945 gpr_log(GPR_INFO, " .. choose next poller to be peer %p", worker);
947 GPR_ASSERT(worker->next->initialized_cv);
948 gpr_atm_no_barrier_store(&g_active_poller, (gpr_atm)worker->next);
949 SET_KICK_STATE(worker->next, DESIGNATED_POLLER);
950 GRPC_STATS_INC_POLLSET_KICK_WAKEUP_CV();
951 gpr_cv_signal(&worker->next->cv);
952 if (grpc_core::ExecCtx::Get()->HasWork()) {
953 gpr_mu_unlock(&pollset->mu);
954 grpc_core::ExecCtx::Get()->Flush();
955 gpr_mu_lock(&pollset->mu);
958 gpr_atm_no_barrier_store(&g_active_poller, 0);
959 size_t poller_neighborhood_idx =
960 static_cast<size_t>(pollset->neighborhood - g_neighborhoods);
961 gpr_mu_unlock(&pollset->mu);
962 bool found_worker = false;
963 bool scan_state[MAX_NEIGHBORHOODS];
964 for (size_t i = 0; !found_worker && i < g_num_neighborhoods; i++) {
965 pollset_neighborhood* neighborhood =
966 &g_neighborhoods[(poller_neighborhood_idx + i) %
967 g_num_neighborhoods];
968 if (gpr_mu_trylock(&neighborhood->mu)) {
969 found_worker = check_neighborhood_for_available_poller(neighborhood);
970 gpr_mu_unlock(&neighborhood->mu);
971 scan_state[i] = true;
973 scan_state[i] = false;
976 for (size_t i = 0; !found_worker && i < g_num_neighborhoods; i++) {
977 if (scan_state[i]) continue;
978 pollset_neighborhood* neighborhood =
979 &g_neighborhoods[(poller_neighborhood_idx + i) %
980 g_num_neighborhoods];
981 gpr_mu_lock(&neighborhood->mu);
982 found_worker = check_neighborhood_for_available_poller(neighborhood);
983 gpr_mu_unlock(&neighborhood->mu);
985 grpc_core::ExecCtx::Get()->Flush();
986 gpr_mu_lock(&pollset->mu);
988 } else if (grpc_core::ExecCtx::Get()->HasWork()) {
989 gpr_mu_unlock(&pollset->mu);
990 grpc_core::ExecCtx::Get()->Flush();
991 gpr_mu_lock(&pollset->mu);
993 if (worker->initialized_cv) {
994 gpr_cv_destroy(&worker->cv);
996 if (GRPC_TRACE_FLAG_ENABLED(grpc_polling_trace)) {
997 gpr_log(GPR_INFO, " .. remove worker");
999 if (EMPTIED == worker_remove(pollset, worker)) {
1000 pollset_maybe_finish_shutdown(pollset);
1002 GPR_ASSERT(gpr_atm_no_barrier_load(&g_active_poller) != (gpr_atm)worker);
1005 /* pollset->po.mu lock must be held by the caller before calling this.
1006 The function pollset_work() may temporarily release the lock (pollset->po.mu)
1007 during the course of its execution but it will always re-acquire the lock and
1008 ensure that it is held by the time the function returns */
1009 static grpc_error* pollset_work(grpc_pollset* ps,
1010 grpc_pollset_worker** worker_hdl,
1011 grpc_millis deadline) {
1012 GPR_TIMER_SCOPE("pollset_work", 0);
1013 grpc_pollset_worker worker;
1014 grpc_error* error = GRPC_ERROR_NONE;
1015 static const char* err_desc = "pollset_work";
1016 if (ps->kicked_without_poller) {
1017 ps->kicked_without_poller = false;
1018 return GRPC_ERROR_NONE;
1021 if (begin_worker(ps, &worker, worker_hdl, deadline)) {
1022 gpr_tls_set(&g_current_thread_pollset, (intptr_t)ps);
1023 gpr_tls_set(&g_current_thread_worker, (intptr_t)&worker);
1024 GPR_ASSERT(!ps->shutting_down);
1025 GPR_ASSERT(!ps->seen_inactive);
1027 gpr_mu_unlock(&ps->mu); /* unlock */
1028 /* This is the designated polling thread at this point and should ideally do
1029 polling. However, if there are unprocessed events left from a previous
1030 call to do_epoll_wait(), skip calling epoll_wait() in this iteration and
1031 process the pending epoll events.
1033 The reason for decoupling do_epoll_wait and process_epoll_events is to
1034 better distribute the work (i.e handling epoll events) across multiple
1037 process_epoll_events() returns very quickly: It just queues the work on
1038 exec_ctx but does not execute it (the actual exectution or more
1039 accurately grpc_core::ExecCtx::Get()->Flush() happens in end_worker()
1040 AFTER selecting a designated poller). So we are not waiting long periods
1041 without a designated poller */
1042 if (gpr_atm_acq_load(&g_epoll_set.cursor) ==
1043 gpr_atm_acq_load(&g_epoll_set.num_events)) {
1044 append_error(&error, do_epoll_wait(ps, deadline), err_desc);
1046 append_error(&error, process_epoll_events(ps), err_desc);
1048 gpr_mu_lock(&ps->mu); /* lock */
1050 gpr_tls_set(&g_current_thread_worker, 0);
1052 gpr_tls_set(&g_current_thread_pollset, (intptr_t)ps);
1054 end_worker(ps, &worker, worker_hdl);
1056 gpr_tls_set(&g_current_thread_pollset, 0);
1060 static grpc_error* pollset_kick(grpc_pollset* pollset,
1061 grpc_pollset_worker* specific_worker) {
1062 GPR_TIMER_SCOPE("pollset_kick", 0);
1063 GRPC_STATS_INC_POLLSET_KICK();
1064 grpc_error* ret_err = GRPC_ERROR_NONE;
1065 if (GRPC_TRACE_FLAG_ENABLED(grpc_polling_trace)) {
1067 gpr_strvec_init(&log);
1069 gpr_asprintf(&tmp, "PS:%p KICK:%p curps=%p curworker=%p root=%p", pollset,
1070 specific_worker, (void*)gpr_tls_get(&g_current_thread_pollset),
1071 (void*)gpr_tls_get(&g_current_thread_worker),
1072 pollset->root_worker);
1073 gpr_strvec_add(&log, tmp);
1074 if (pollset->root_worker != nullptr) {
1075 gpr_asprintf(&tmp, " {kick_state=%s next=%p {kick_state=%s}}",
1076 kick_state_string(pollset->root_worker->state),
1077 pollset->root_worker->next,
1078 kick_state_string(pollset->root_worker->next->state));
1079 gpr_strvec_add(&log, tmp);
1081 if (specific_worker != nullptr) {
1082 gpr_asprintf(&tmp, " worker_kick_state=%s",
1083 kick_state_string(specific_worker->state));
1084 gpr_strvec_add(&log, tmp);
1086 tmp = gpr_strvec_flatten(&log, nullptr);
1087 gpr_strvec_destroy(&log);
1088 gpr_log(GPR_DEBUG, "%s", tmp);
1092 if (specific_worker == nullptr) {
1093 if (gpr_tls_get(&g_current_thread_pollset) != (intptr_t)pollset) {
1094 grpc_pollset_worker* root_worker = pollset->root_worker;
1095 if (root_worker == nullptr) {
1096 GRPC_STATS_INC_POLLSET_KICKED_WITHOUT_POLLER();
1097 pollset->kicked_without_poller = true;
1098 if (GRPC_TRACE_FLAG_ENABLED(grpc_polling_trace)) {
1099 gpr_log(GPR_INFO, " .. kicked_without_poller");
1103 grpc_pollset_worker* next_worker = root_worker->next;
1104 if (root_worker->state == KICKED) {
1105 GRPC_STATS_INC_POLLSET_KICKED_AGAIN();
1106 if (GRPC_TRACE_FLAG_ENABLED(grpc_polling_trace)) {
1107 gpr_log(GPR_INFO, " .. already kicked %p", root_worker);
1109 SET_KICK_STATE(root_worker, KICKED);
1111 } else if (next_worker->state == KICKED) {
1112 GRPC_STATS_INC_POLLSET_KICKED_AGAIN();
1113 if (GRPC_TRACE_FLAG_ENABLED(grpc_polling_trace)) {
1114 gpr_log(GPR_INFO, " .. already kicked %p", next_worker);
1116 SET_KICK_STATE(next_worker, KICKED);
1118 } else if (root_worker ==
1119 next_worker && // only try and wake up a poller if
1120 // there is no next worker
1121 root_worker == (grpc_pollset_worker*)gpr_atm_no_barrier_load(
1122 &g_active_poller)) {
1123 GRPC_STATS_INC_POLLSET_KICK_WAKEUP_FD();
1124 if (GRPC_TRACE_FLAG_ENABLED(grpc_polling_trace)) {
1125 gpr_log(GPR_INFO, " .. kicked %p", root_worker);
1127 SET_KICK_STATE(root_worker, KICKED);
1128 ret_err = grpc_wakeup_fd_wakeup(&global_wakeup_fd);
1130 } else if (next_worker->state == UNKICKED) {
1131 GRPC_STATS_INC_POLLSET_KICK_WAKEUP_CV();
1132 if (GRPC_TRACE_FLAG_ENABLED(grpc_polling_trace)) {
1133 gpr_log(GPR_INFO, " .. kicked %p", next_worker);
1135 GPR_ASSERT(next_worker->initialized_cv);
1136 SET_KICK_STATE(next_worker, KICKED);
1137 gpr_cv_signal(&next_worker->cv);
1139 } else if (next_worker->state == DESIGNATED_POLLER) {
1140 if (root_worker->state != DESIGNATED_POLLER) {
1141 if (GRPC_TRACE_FLAG_ENABLED(grpc_polling_trace)) {
1144 " .. kicked root non-poller %p (initialized_cv=%d) (poller=%p)",
1145 root_worker, root_worker->initialized_cv, next_worker);
1147 SET_KICK_STATE(root_worker, KICKED);
1148 if (root_worker->initialized_cv) {
1149 GRPC_STATS_INC_POLLSET_KICK_WAKEUP_CV();
1150 gpr_cv_signal(&root_worker->cv);
1154 GRPC_STATS_INC_POLLSET_KICK_WAKEUP_FD();
1155 if (GRPC_TRACE_FLAG_ENABLED(grpc_polling_trace)) {
1156 gpr_log(GPR_INFO, " .. non-root poller %p (root=%p)", next_worker,
1159 SET_KICK_STATE(next_worker, KICKED);
1160 ret_err = grpc_wakeup_fd_wakeup(&global_wakeup_fd);
1164 GRPC_STATS_INC_POLLSET_KICKED_AGAIN();
1165 GPR_ASSERT(next_worker->state == KICKED);
1166 SET_KICK_STATE(next_worker, KICKED);
1170 GRPC_STATS_INC_POLLSET_KICK_OWN_THREAD();
1171 if (GRPC_TRACE_FLAG_ENABLED(grpc_polling_trace)) {
1172 gpr_log(GPR_INFO, " .. kicked while waking up");
1177 GPR_UNREACHABLE_CODE(goto done);
1180 if (specific_worker->state == KICKED) {
1181 if (GRPC_TRACE_FLAG_ENABLED(grpc_polling_trace)) {
1182 gpr_log(GPR_INFO, " .. specific worker already kicked");
1185 } else if (gpr_tls_get(&g_current_thread_worker) ==
1186 (intptr_t)specific_worker) {
1187 GRPC_STATS_INC_POLLSET_KICK_OWN_THREAD();
1188 if (GRPC_TRACE_FLAG_ENABLED(grpc_polling_trace)) {
1189 gpr_log(GPR_INFO, " .. mark %p kicked", specific_worker);
1191 SET_KICK_STATE(specific_worker, KICKED);
1193 } else if (specific_worker ==
1194 (grpc_pollset_worker*)gpr_atm_no_barrier_load(&g_active_poller)) {
1195 GRPC_STATS_INC_POLLSET_KICK_WAKEUP_FD();
1196 if (GRPC_TRACE_FLAG_ENABLED(grpc_polling_trace)) {
1197 gpr_log(GPR_INFO, " .. kick active poller");
1199 SET_KICK_STATE(specific_worker, KICKED);
1200 ret_err = grpc_wakeup_fd_wakeup(&global_wakeup_fd);
1202 } else if (specific_worker->initialized_cv) {
1203 GRPC_STATS_INC_POLLSET_KICK_WAKEUP_CV();
1204 if (GRPC_TRACE_FLAG_ENABLED(grpc_polling_trace)) {
1205 gpr_log(GPR_INFO, " .. kick waiting worker");
1207 SET_KICK_STATE(specific_worker, KICKED);
1208 gpr_cv_signal(&specific_worker->cv);
1211 GRPC_STATS_INC_POLLSET_KICKED_AGAIN();
1212 if (GRPC_TRACE_FLAG_ENABLED(grpc_polling_trace)) {
1213 gpr_log(GPR_INFO, " .. kick non-waiting worker");
1215 SET_KICK_STATE(specific_worker, KICKED);
1222 static void pollset_add_fd(grpc_pollset* pollset, grpc_fd* fd) {}
1224 /*******************************************************************************
1225 * Pollset-set Definitions
1228 static grpc_pollset_set* pollset_set_create(void) {
1229 return (grpc_pollset_set*)(static_cast<intptr_t>(0xdeafbeef));
1232 static void pollset_set_destroy(grpc_pollset_set* pss) {}
1234 static void pollset_set_add_fd(grpc_pollset_set* pss, grpc_fd* fd) {}
1236 static void pollset_set_del_fd(grpc_pollset_set* pss, grpc_fd* fd) {}
1238 static void pollset_set_add_pollset(grpc_pollset_set* pss, grpc_pollset* ps) {}
1240 static void pollset_set_del_pollset(grpc_pollset_set* pss, grpc_pollset* ps) {}
1242 static void pollset_set_add_pollset_set(grpc_pollset_set* bag,
1243 grpc_pollset_set* item) {}
1245 static void pollset_set_del_pollset_set(grpc_pollset_set* bag,
1246 grpc_pollset_set* item) {}
1248 /*******************************************************************************
1249 * Event engine binding
1252 static bool is_any_background_poller_thread(void) { return false; }
1254 static void shutdown_background_closure(void) {}
1256 static bool add_closure_to_background_poller(grpc_closure* closure,
1257 grpc_error* error) {
1261 static void shutdown_engine(void) {
1262 fd_global_shutdown();
1263 pollset_global_shutdown();
1264 epoll_set_shutdown();
1265 if (grpc_core::Fork::Enabled()) {
1266 gpr_mu_destroy(&fork_fd_list_mu);
1267 grpc_core::Fork::SetResetChildPollingEngineFunc(nullptr);
1271 static const grpc_event_engine_vtable vtable = {
1272 sizeof(grpc_pollset),
1296 pollset_set_destroy,
1297 pollset_set_add_pollset,
1298 pollset_set_del_pollset,
1299 pollset_set_add_pollset_set,
1300 pollset_set_del_pollset_set,
1304 is_any_background_poller_thread,
1305 shutdown_background_closure,
1307 add_closure_to_background_poller,
1310 /* Called by the child process's post-fork handler to close open fds, including
1311 * the global epoll fd. This allows gRPC to shutdown in the child process
1312 * without interfering with connections or RPCs ongoing in the parent. */
1313 static void reset_event_manager_on_fork() {
1314 gpr_mu_lock(&fork_fd_list_mu);
1315 while (fork_fd_list_head != nullptr) {
1316 close(fork_fd_list_head->fd);
1317 fork_fd_list_head->fd = -1;
1318 fork_fd_list_head = fork_fd_list_head->fork_fd_list->next;
1320 gpr_mu_unlock(&fork_fd_list_mu);
1322 grpc_init_epoll1_linux(true);
1325 /* It is possible that GLIBC has epoll but the underlying kernel doesn't.
1326 * Create epoll_fd (epoll_set_init() takes care of that) to make sure epoll
1327 * support is available */
1328 const grpc_event_engine_vtable* grpc_init_epoll1_linux(bool explicit_request) {
1329 if (!grpc_has_wakeup_fd()) {
1330 gpr_log(GPR_ERROR, "Skipping epoll1 because of no wakeup fd.");
1334 if (!epoll_set_init()) {
1340 if (!GRPC_LOG_IF_ERROR("pollset_global_init", pollset_global_init())) {
1341 fd_global_shutdown();
1342 epoll_set_shutdown();
1346 if (grpc_core::Fork::Enabled()) {
1347 gpr_mu_init(&fork_fd_list_mu);
1348 grpc_core::Fork::SetResetChildPollingEngineFunc(
1349 reset_event_manager_on_fork);
1354 #else /* defined(GRPC_LINUX_EPOLL) */
1355 #if defined(GRPC_POSIX_SOCKET_EV_EPOLL1)
1356 #include "src/core/lib/iomgr/ev_epoll1_linux.h"
1357 /* If GRPC_LINUX_EPOLL is not defined, it means epoll is not available. Return
1359 const grpc_event_engine_vtable* grpc_init_epoll1_linux(bool explicit_request) {
1362 #endif /* defined(GRPC_POSIX_SOCKET_EV_EPOLL1) */
1363 #endif /* !defined(GRPC_LINUX_EPOLL) */