Built motion from commit 6a09e18b.|2.6.11
[motion2.git] / legacy-libs / grpc / deps / grpc / src / core / lib / iomgr / ev_epoll1_linux.cc
1 /*
2  *
3  * Copyright 2017 gRPC authors.
4  *
5  * Licensed under the Apache License, Version 2.0 (the "License");
6  * you may not use this file except in compliance with the License.
7  * You may obtain a copy of the License at
8  *
9  *     http://www.apache.org/licenses/LICENSE-2.0
10  *
11  * Unless required by applicable law or agreed to in writing, software
12  * distributed under the License is distributed on an "AS IS" BASIS,
13  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14  * See the License for the specific language governing permissions and
15  * limitations under the License.
16  *
17  */
18
19 #include <grpc/support/port_platform.h>
20
21 #include "src/core/lib/iomgr/port.h"
22
23 #include <grpc/support/log.h>
24
25 /* This polling engine is only relevant on linux kernels supporting epoll
26    epoll_create() or epoll_create1() */
27 #ifdef GRPC_LINUX_EPOLL
28 #include "src/core/lib/iomgr/ev_epoll1_linux.h"
29
30 #include <assert.h>
31 #include <errno.h>
32 #include <fcntl.h>
33 #include <limits.h>
34 #include <poll.h>
35 #include <pthread.h>
36 #include <string.h>
37 #include <sys/epoll.h>
38 #include <sys/socket.h>
39 #include <unistd.h>
40
41 #include <grpc/support/alloc.h>
42 #include <grpc/support/cpu.h>
43 #include <grpc/support/string_util.h>
44
45 #include "src/core/lib/debug/stats.h"
46 #include "src/core/lib/gpr/string.h"
47 #include "src/core/lib/gpr/tls.h"
48 #include "src/core/lib/gpr/useful.h"
49 #include "src/core/lib/gprpp/manual_constructor.h"
50 #include "src/core/lib/iomgr/block_annotate.h"
51 #include "src/core/lib/iomgr/ev_posix.h"
52 #include "src/core/lib/iomgr/iomgr_internal.h"
53 #include "src/core/lib/iomgr/lockfree_event.h"
54 #include "src/core/lib/iomgr/wakeup_fd_posix.h"
55 #include "src/core/lib/profiling/timers.h"
56
57 static grpc_wakeup_fd global_wakeup_fd;
58
59 /*******************************************************************************
60  * Singleton epoll set related fields
61  */
62
63 #define MAX_EPOLL_EVENTS 100
64 #define MAX_EPOLL_EVENTS_HANDLED_PER_ITERATION 1
65
66 /* NOTE ON SYNCHRONIZATION:
67  * - Fields in this struct are only modified by the designated poller. Hence
68  *   there is no need for any locks to protect the struct.
69  * - num_events and cursor fields have to be of atomic type to provide memory
70  *   visibility guarantees only. i.e In case of multiple pollers, the designated
71  *   polling thread keeps changing; the thread that wrote these values may be
72  *   different from the thread reading the values
73  */
74 typedef struct epoll_set {
75   int epfd;
76
77   /* The epoll_events after the last call to epoll_wait() */
78   struct epoll_event events[MAX_EPOLL_EVENTS];
79
80   /* The number of epoll_events after the last call to epoll_wait() */
81   gpr_atm num_events;
82
83   /* Index of the first event in epoll_events that has to be processed. This
84    * field is only valid if num_events > 0 */
85   gpr_atm cursor;
86 } epoll_set;
87
88 /* The global singleton epoll set */
89 static epoll_set g_epoll_set;
90
91 static int epoll_create_and_cloexec() {
92 #ifdef GRPC_LINUX_EPOLL_CREATE1
93   int fd = epoll_create1(EPOLL_CLOEXEC);
94   if (fd < 0) {
95     gpr_log(GPR_ERROR, "epoll_create1 unavailable");
96   }
97 #else
98   int fd = epoll_create(MAX_EPOLL_EVENTS);
99   if (fd < 0) {
100     gpr_log(GPR_ERROR, "epoll_create unavailable");
101   } else if (fcntl(fd, F_SETFD, FD_CLOEXEC) != 0) {
102     gpr_log(GPR_ERROR, "fcntl following epoll_create failed");
103     return -1;
104   }
105 #endif
106   return fd;
107 }
108
109 /* Must be called *only* once */
110 static bool epoll_set_init() {
111   g_epoll_set.epfd = epoll_create_and_cloexec();
112   if (g_epoll_set.epfd < 0) {
113     return false;
114   }
115
116   gpr_log(GPR_INFO, "grpc epoll fd: %d", g_epoll_set.epfd);
117   gpr_atm_no_barrier_store(&g_epoll_set.num_events, 0);
118   gpr_atm_no_barrier_store(&g_epoll_set.cursor, 0);
119   return true;
120 }
121
122 /* epoll_set_init() MUST be called before calling this. */
123 static void epoll_set_shutdown() {
124   if (g_epoll_set.epfd >= 0) {
125     close(g_epoll_set.epfd);
126     g_epoll_set.epfd = -1;
127   }
128 }
129
130 /*******************************************************************************
131  * Fd Declarations
132  */
133
134 /* Only used when GRPC_ENABLE_FORK_SUPPORT=1 */
135 struct grpc_fork_fd_list {
136   grpc_fd* fd;
137   grpc_fd* next;
138   grpc_fd* prev;
139 };
140
141 struct grpc_fd {
142   int fd;
143
144   grpc_core::ManualConstructor<grpc_core::LockfreeEvent> read_closure;
145   grpc_core::ManualConstructor<grpc_core::LockfreeEvent> write_closure;
146   grpc_core::ManualConstructor<grpc_core::LockfreeEvent> error_closure;
147
148   struct grpc_fd* freelist_next;
149
150   grpc_iomgr_object iomgr_object;
151
152   /* Only used when GRPC_ENABLE_FORK_SUPPORT=1 */
153   grpc_fork_fd_list* fork_fd_list;
154 };
155
156 static void fd_global_init(void);
157 static void fd_global_shutdown(void);
158
159 /*******************************************************************************
160  * Pollset Declarations
161  */
162
163 typedef enum { UNKICKED, KICKED, DESIGNATED_POLLER } kick_state;
164
165 static const char* kick_state_string(kick_state st) {
166   switch (st) {
167     case UNKICKED:
168       return "UNKICKED";
169     case KICKED:
170       return "KICKED";
171     case DESIGNATED_POLLER:
172       return "DESIGNATED_POLLER";
173   }
174   GPR_UNREACHABLE_CODE(return "UNKNOWN");
175 }
176
177 struct grpc_pollset_worker {
178   kick_state state;
179   int kick_state_mutator;  // which line of code last changed kick state
180   bool initialized_cv;
181   grpc_pollset_worker* next;
182   grpc_pollset_worker* prev;
183   gpr_cv cv;
184   grpc_closure_list schedule_on_end_work;
185 };
186
187 #define SET_KICK_STATE(worker, kick_state)   \
188   do {                                       \
189     (worker)->state = (kick_state);          \
190     (worker)->kick_state_mutator = __LINE__; \
191   } while (false)
192
193 #define MAX_NEIGHBORHOODS 1024
194
195 typedef struct pollset_neighborhood {
196   union {
197     char pad[GPR_CACHELINE_SIZE];
198     struct {
199       gpr_mu mu;
200       grpc_pollset* active_root;
201     };
202   };
203 } pollset_neighborhood;
204
205 struct grpc_pollset {
206   gpr_mu mu;
207   pollset_neighborhood* neighborhood;
208   bool reassigning_neighborhood;
209   grpc_pollset_worker* root_worker;
210   bool kicked_without_poller;
211
212   /* Set to true if the pollset is observed to have no workers available to
213      poll */
214   bool seen_inactive;
215   bool shutting_down;             /* Is the pollset shutting down ? */
216   grpc_closure* shutdown_closure; /* Called after shutdown is complete */
217
218   /* Number of workers who are *about-to* attach themselves to the pollset
219    * worker list */
220   int begin_refs;
221
222   grpc_pollset* next;
223   grpc_pollset* prev;
224 };
225
226 /*******************************************************************************
227  * Pollset-set Declarations
228  */
229
230 struct grpc_pollset_set {
231   char unused;
232 };
233
234 /*******************************************************************************
235  * Common helpers
236  */
237
238 static bool append_error(grpc_error** composite, grpc_error* error,
239                          const char* desc) {
240   if (error == GRPC_ERROR_NONE) return true;
241   if (*composite == GRPC_ERROR_NONE) {
242     *composite = GRPC_ERROR_CREATE_FROM_COPIED_STRING(desc);
243   }
244   *composite = grpc_error_add_child(*composite, error);
245   return false;
246 }
247
248 /*******************************************************************************
249  * Fd Definitions
250  */
251
252 /* We need to keep a freelist not because of any concerns of malloc performance
253  * but instead so that implementations with multiple threads in (for example)
254  * epoll_wait deal with the race between pollset removal and incoming poll
255  * notifications.
256  *
257  * The problem is that the poller ultimately holds a reference to this
258  * object, so it is very difficult to know when is safe to free it, at least
259  * without some expensive synchronization.
260  *
261  * If we keep the object freelisted, in the worst case losing this race just
262  * becomes a spurious read notification on a reused fd.
263  */
264
265 /* The alarm system needs to be able to wakeup 'some poller' sometimes
266  * (specifically when a new alarm needs to be triggered earlier than the next
267  * alarm 'epoch'). This wakeup_fd gives us something to alert on when such a
268  * case occurs. */
269
270 static grpc_fd* fd_freelist = nullptr;
271 static gpr_mu fd_freelist_mu;
272
273 /* Only used when GRPC_ENABLE_FORK_SUPPORT=1 */
274 static grpc_fd* fork_fd_list_head = nullptr;
275 static gpr_mu fork_fd_list_mu;
276
277 static void fd_global_init(void) { gpr_mu_init(&fd_freelist_mu); }
278
279 static void fd_global_shutdown(void) {
280   // TODO(guantaol): We don't have a reasonable explanation about this
281   // lock()/unlock() pattern. It can be a valid barrier if there is at most one
282   // pending lock() at this point. Otherwise, there is still a possibility of
283   // use-after-free race. Need to reason about the code and/or clean it up.
284   gpr_mu_lock(&fd_freelist_mu);
285   gpr_mu_unlock(&fd_freelist_mu);
286   while (fd_freelist != nullptr) {
287     grpc_fd* fd = fd_freelist;
288     fd_freelist = fd_freelist->freelist_next;
289     gpr_free(fd);
290   }
291   gpr_mu_destroy(&fd_freelist_mu);
292 }
293
294 static void fork_fd_list_add_grpc_fd(grpc_fd* fd) {
295   if (grpc_core::Fork::Enabled()) {
296     gpr_mu_lock(&fork_fd_list_mu);
297     fd->fork_fd_list =
298         static_cast<grpc_fork_fd_list*>(gpr_malloc(sizeof(grpc_fork_fd_list)));
299     fd->fork_fd_list->next = fork_fd_list_head;
300     fd->fork_fd_list->prev = nullptr;
301     if (fork_fd_list_head != nullptr) {
302       fork_fd_list_head->fork_fd_list->prev = fd;
303     }
304     fork_fd_list_head = fd;
305     gpr_mu_unlock(&fork_fd_list_mu);
306   }
307 }
308
309 static void fork_fd_list_remove_grpc_fd(grpc_fd* fd) {
310   if (grpc_core::Fork::Enabled()) {
311     gpr_mu_lock(&fork_fd_list_mu);
312     if (fork_fd_list_head == fd) {
313       fork_fd_list_head = fd->fork_fd_list->next;
314     }
315     if (fd->fork_fd_list->prev != nullptr) {
316       fd->fork_fd_list->prev->fork_fd_list->next = fd->fork_fd_list->next;
317     }
318     if (fd->fork_fd_list->next != nullptr) {
319       fd->fork_fd_list->next->fork_fd_list->prev = fd->fork_fd_list->prev;
320     }
321     gpr_free(fd->fork_fd_list);
322     gpr_mu_unlock(&fork_fd_list_mu);
323   }
324 }
325
326 static grpc_fd* fd_create(int fd, const char* name, bool track_err) {
327   grpc_fd* new_fd = nullptr;
328
329   gpr_mu_lock(&fd_freelist_mu);
330   if (fd_freelist != nullptr) {
331     new_fd = fd_freelist;
332     fd_freelist = fd_freelist->freelist_next;
333   }
334   gpr_mu_unlock(&fd_freelist_mu);
335
336   if (new_fd == nullptr) {
337     new_fd = static_cast<grpc_fd*>(gpr_malloc(sizeof(grpc_fd)));
338     new_fd->read_closure.Init();
339     new_fd->write_closure.Init();
340     new_fd->error_closure.Init();
341   }
342   new_fd->fd = fd;
343   new_fd->read_closure->InitEvent();
344   new_fd->write_closure->InitEvent();
345   new_fd->error_closure->InitEvent();
346
347   new_fd->freelist_next = nullptr;
348
349   char* fd_name;
350   gpr_asprintf(&fd_name, "%s fd=%d", name, fd);
351   grpc_iomgr_register_object(&new_fd->iomgr_object, fd_name);
352   fork_fd_list_add_grpc_fd(new_fd);
353 #ifndef NDEBUG
354   if (GRPC_TRACE_FLAG_ENABLED(grpc_trace_fd_refcount)) {
355     gpr_log(GPR_DEBUG, "FD %d %p create %s", fd, new_fd, fd_name);
356   }
357 #endif
358   gpr_free(fd_name);
359
360   struct epoll_event ev;
361   ev.events = static_cast<uint32_t>(EPOLLIN | EPOLLOUT | EPOLLET);
362   /* Use the least significant bit of ev.data.ptr to store track_err. We expect
363    * the addresses to be word aligned. We need to store track_err to avoid
364    * synchronization issues when accessing it after receiving an event.
365    * Accessing fd would be a data race there because the fd might have been
366    * returned to the free list at that point. */
367   ev.data.ptr = reinterpret_cast<void*>(reinterpret_cast<intptr_t>(new_fd) |
368                                         (track_err ? 1 : 0));
369   if (epoll_ctl(g_epoll_set.epfd, EPOLL_CTL_ADD, fd, &ev) != 0) {
370     gpr_log(GPR_ERROR, "epoll_ctl failed: %s", strerror(errno));
371   }
372
373   return new_fd;
374 }
375
376 static int fd_wrapped_fd(grpc_fd* fd) { return fd->fd; }
377
378 /* if 'releasing_fd' is true, it means that we are going to detach the internal
379  * fd from grpc_fd structure (i.e which means we should not be calling
380  * shutdown() syscall on that fd) */
381 static void fd_shutdown_internal(grpc_fd* fd, grpc_error* why,
382                                  bool releasing_fd) {
383   if (fd->read_closure->SetShutdown(GRPC_ERROR_REF(why))) {
384     if (!releasing_fd) {
385       shutdown(fd->fd, SHUT_RDWR);
386     } else {
387       /* we need a dummy event for earlier linux versions. */
388       epoll_event dummy_event;
389       if (epoll_ctl(g_epoll_set.epfd, EPOLL_CTL_DEL, fd->fd, &dummy_event) !=
390           0) {
391         gpr_log(GPR_ERROR, "epoll_ctl failed: %s", strerror(errno));
392       }
393     }
394     fd->write_closure->SetShutdown(GRPC_ERROR_REF(why));
395     fd->error_closure->SetShutdown(GRPC_ERROR_REF(why));
396   }
397   GRPC_ERROR_UNREF(why);
398 }
399
400 /* Might be called multiple times */
401 static void fd_shutdown(grpc_fd* fd, grpc_error* why) {
402   fd_shutdown_internal(fd, why, false);
403 }
404
405 static void fd_orphan(grpc_fd* fd, grpc_closure* on_done, int* release_fd,
406                       const char* reason) {
407   grpc_error* error = GRPC_ERROR_NONE;
408   bool is_release_fd = (release_fd != nullptr);
409
410   if (!fd->read_closure->IsShutdown()) {
411     fd_shutdown_internal(fd, GRPC_ERROR_CREATE_FROM_COPIED_STRING(reason),
412                          is_release_fd);
413   }
414
415   /* If release_fd is not NULL, we should be relinquishing control of the file
416      descriptor fd->fd (but we still own the grpc_fd structure). */
417   if (is_release_fd) {
418     *release_fd = fd->fd;
419   } else {
420     close(fd->fd);
421   }
422
423   GRPC_CLOSURE_SCHED(on_done, GRPC_ERROR_REF(error));
424
425   grpc_iomgr_unregister_object(&fd->iomgr_object);
426   fork_fd_list_remove_grpc_fd(fd);
427   fd->read_closure->DestroyEvent();
428   fd->write_closure->DestroyEvent();
429   fd->error_closure->DestroyEvent();
430
431   gpr_mu_lock(&fd_freelist_mu);
432   fd->freelist_next = fd_freelist;
433   fd_freelist = fd;
434   gpr_mu_unlock(&fd_freelist_mu);
435 }
436
437 static bool fd_is_shutdown(grpc_fd* fd) {
438   return fd->read_closure->IsShutdown();
439 }
440
441 static void fd_notify_on_read(grpc_fd* fd, grpc_closure* closure) {
442   fd->read_closure->NotifyOn(closure);
443 }
444
445 static void fd_notify_on_write(grpc_fd* fd, grpc_closure* closure) {
446   fd->write_closure->NotifyOn(closure);
447 }
448
449 static void fd_notify_on_error(grpc_fd* fd, grpc_closure* closure) {
450   fd->error_closure->NotifyOn(closure);
451 }
452
453 static void fd_become_readable(grpc_fd* fd) { fd->read_closure->SetReady(); }
454
455 static void fd_become_writable(grpc_fd* fd) { fd->write_closure->SetReady(); }
456
457 static void fd_has_errors(grpc_fd* fd) { fd->error_closure->SetReady(); }
458
459 /*******************************************************************************
460  * Pollset Definitions
461  */
462
463 GPR_TLS_DECL(g_current_thread_pollset);
464 GPR_TLS_DECL(g_current_thread_worker);
465
466 /* The designated poller */
467 static gpr_atm g_active_poller;
468
469 static pollset_neighborhood* g_neighborhoods;
470 static size_t g_num_neighborhoods;
471
472 /* Return true if first in list */
473 static bool worker_insert(grpc_pollset* pollset, grpc_pollset_worker* worker) {
474   if (pollset->root_worker == nullptr) {
475     pollset->root_worker = worker;
476     worker->next = worker->prev = worker;
477     return true;
478   } else {
479     worker->next = pollset->root_worker;
480     worker->prev = worker->next->prev;
481     worker->next->prev = worker;
482     worker->prev->next = worker;
483     return false;
484   }
485 }
486
487 /* Return true if last in list */
488 typedef enum { EMPTIED, NEW_ROOT, REMOVED } worker_remove_result;
489
490 static worker_remove_result worker_remove(grpc_pollset* pollset,
491                                           grpc_pollset_worker* worker) {
492   if (worker == pollset->root_worker) {
493     if (worker == worker->next) {
494       pollset->root_worker = nullptr;
495       return EMPTIED;
496     } else {
497       pollset->root_worker = worker->next;
498       worker->prev->next = worker->next;
499       worker->next->prev = worker->prev;
500       return NEW_ROOT;
501     }
502   } else {
503     worker->prev->next = worker->next;
504     worker->next->prev = worker->prev;
505     return REMOVED;
506   }
507 }
508
509 static size_t choose_neighborhood(void) {
510   return static_cast<size_t>(gpr_cpu_current_cpu()) % g_num_neighborhoods;
511 }
512
513 static grpc_error* pollset_global_init(void) {
514   gpr_tls_init(&g_current_thread_pollset);
515   gpr_tls_init(&g_current_thread_worker);
516   gpr_atm_no_barrier_store(&g_active_poller, 0);
517   global_wakeup_fd.read_fd = -1;
518   grpc_error* err = grpc_wakeup_fd_init(&global_wakeup_fd);
519   if (err != GRPC_ERROR_NONE) return err;
520   struct epoll_event ev;
521   ev.events = static_cast<uint32_t>(EPOLLIN | EPOLLET);
522   ev.data.ptr = &global_wakeup_fd;
523   if (epoll_ctl(g_epoll_set.epfd, EPOLL_CTL_ADD, global_wakeup_fd.read_fd,
524                 &ev) != 0) {
525     return GRPC_OS_ERROR(errno, "epoll_ctl");
526   }
527   g_num_neighborhoods = GPR_CLAMP(gpr_cpu_num_cores(), 1, MAX_NEIGHBORHOODS);
528   g_neighborhoods = static_cast<pollset_neighborhood*>(
529       gpr_zalloc(sizeof(*g_neighborhoods) * g_num_neighborhoods));
530   for (size_t i = 0; i < g_num_neighborhoods; i++) {
531     gpr_mu_init(&g_neighborhoods[i].mu);
532   }
533   return GRPC_ERROR_NONE;
534 }
535
536 static void pollset_global_shutdown(void) {
537   gpr_tls_destroy(&g_current_thread_pollset);
538   gpr_tls_destroy(&g_current_thread_worker);
539   if (global_wakeup_fd.read_fd != -1) grpc_wakeup_fd_destroy(&global_wakeup_fd);
540   for (size_t i = 0; i < g_num_neighborhoods; i++) {
541     gpr_mu_destroy(&g_neighborhoods[i].mu);
542   }
543   gpr_free(g_neighborhoods);
544 }
545
546 static void pollset_init(grpc_pollset* pollset, gpr_mu** mu) {
547   gpr_mu_init(&pollset->mu);
548   *mu = &pollset->mu;
549   pollset->neighborhood = &g_neighborhoods[choose_neighborhood()];
550   pollset->reassigning_neighborhood = false;
551   pollset->root_worker = nullptr;
552   pollset->kicked_without_poller = false;
553   pollset->seen_inactive = true;
554   pollset->shutting_down = false;
555   pollset->shutdown_closure = nullptr;
556   pollset->begin_refs = 0;
557   pollset->next = pollset->prev = nullptr;
558 }
559
560 static void pollset_destroy(grpc_pollset* pollset) {
561   gpr_mu_lock(&pollset->mu);
562   if (!pollset->seen_inactive) {
563     pollset_neighborhood* neighborhood = pollset->neighborhood;
564     gpr_mu_unlock(&pollset->mu);
565   retry_lock_neighborhood:
566     gpr_mu_lock(&neighborhood->mu);
567     gpr_mu_lock(&pollset->mu);
568     if (!pollset->seen_inactive) {
569       if (pollset->neighborhood != neighborhood) {
570         gpr_mu_unlock(&neighborhood->mu);
571         neighborhood = pollset->neighborhood;
572         gpr_mu_unlock(&pollset->mu);
573         goto retry_lock_neighborhood;
574       }
575       pollset->prev->next = pollset->next;
576       pollset->next->prev = pollset->prev;
577       if (pollset == pollset->neighborhood->active_root) {
578         pollset->neighborhood->active_root =
579             pollset->next == pollset ? nullptr : pollset->next;
580       }
581     }
582     gpr_mu_unlock(&pollset->neighborhood->mu);
583   }
584   gpr_mu_unlock(&pollset->mu);
585   gpr_mu_destroy(&pollset->mu);
586 }
587
588 static grpc_error* pollset_kick_all(grpc_pollset* pollset) {
589   GPR_TIMER_SCOPE("pollset_kick_all", 0);
590   grpc_error* error = GRPC_ERROR_NONE;
591   if (pollset->root_worker != nullptr) {
592     grpc_pollset_worker* worker = pollset->root_worker;
593     do {
594       GRPC_STATS_INC_POLLSET_KICK();
595       switch (worker->state) {
596         case KICKED:
597           GRPC_STATS_INC_POLLSET_KICKED_AGAIN();
598           break;
599         case UNKICKED:
600           SET_KICK_STATE(worker, KICKED);
601           if (worker->initialized_cv) {
602             GRPC_STATS_INC_POLLSET_KICK_WAKEUP_CV();
603             gpr_cv_signal(&worker->cv);
604           }
605           break;
606         case DESIGNATED_POLLER:
607           GRPC_STATS_INC_POLLSET_KICK_WAKEUP_FD();
608           SET_KICK_STATE(worker, KICKED);
609           append_error(&error, grpc_wakeup_fd_wakeup(&global_wakeup_fd),
610                        "pollset_kick_all");
611           break;
612       }
613
614       worker = worker->next;
615     } while (worker != pollset->root_worker);
616   }
617   // TODO: sreek.  Check if we need to set 'kicked_without_poller' to true here
618   // in the else case
619   return error;
620 }
621
622 static void pollset_maybe_finish_shutdown(grpc_pollset* pollset) {
623   if (pollset->shutdown_closure != nullptr && pollset->root_worker == nullptr &&
624       pollset->begin_refs == 0) {
625     GPR_TIMER_MARK("pollset_finish_shutdown", 0);
626     GRPC_CLOSURE_SCHED(pollset->shutdown_closure, GRPC_ERROR_NONE);
627     pollset->shutdown_closure = nullptr;
628   }
629 }
630
631 static void pollset_shutdown(grpc_pollset* pollset, grpc_closure* closure) {
632   GPR_TIMER_SCOPE("pollset_shutdown", 0);
633   GPR_ASSERT(pollset->shutdown_closure == nullptr);
634   GPR_ASSERT(!pollset->shutting_down);
635   pollset->shutdown_closure = closure;
636   pollset->shutting_down = true;
637   GRPC_LOG_IF_ERROR("pollset_shutdown", pollset_kick_all(pollset));
638   pollset_maybe_finish_shutdown(pollset);
639 }
640
641 static int poll_deadline_to_millis_timeout(grpc_millis millis) {
642   if (millis == GRPC_MILLIS_INF_FUTURE) return -1;
643   grpc_millis delta = millis - grpc_core::ExecCtx::Get()->Now();
644   if (delta > INT_MAX) {
645     return INT_MAX;
646   } else if (delta < 0) {
647     return 0;
648   } else {
649     return static_cast<int>(delta);
650   }
651 }
652
653 /* Process the epoll events found by do_epoll_wait() function.
654    - g_epoll_set.cursor points to the index of the first event to be processed
655    - This function then processes up-to MAX_EPOLL_EVENTS_PER_ITERATION and
656      updates the g_epoll_set.cursor
657
658    NOTE ON SYNCRHONIZATION: Similar to do_epoll_wait(), this function is only
659    called by g_active_poller thread. So there is no need for synchronization
660    when accessing fields in g_epoll_set */
661 static grpc_error* process_epoll_events(grpc_pollset* pollset) {
662   GPR_TIMER_SCOPE("process_epoll_events", 0);
663
664   static const char* err_desc = "process_events";
665   grpc_error* error = GRPC_ERROR_NONE;
666   long num_events = gpr_atm_acq_load(&g_epoll_set.num_events);
667   long cursor = gpr_atm_acq_load(&g_epoll_set.cursor);
668   for (int idx = 0;
669        (idx < MAX_EPOLL_EVENTS_HANDLED_PER_ITERATION) && cursor != num_events;
670        idx++) {
671     long c = cursor++;
672     struct epoll_event* ev = &g_epoll_set.events[c];
673     void* data_ptr = ev->data.ptr;
674
675     if (data_ptr == &global_wakeup_fd) {
676       append_error(&error, grpc_wakeup_fd_consume_wakeup(&global_wakeup_fd),
677                    err_desc);
678     } else {
679       grpc_fd* fd = reinterpret_cast<grpc_fd*>(
680           reinterpret_cast<intptr_t>(data_ptr) & ~static_cast<intptr_t>(1));
681       bool track_err =
682           reinterpret_cast<intptr_t>(data_ptr) & static_cast<intptr_t>(1);
683       bool cancel = (ev->events & EPOLLHUP) != 0;
684       bool error = (ev->events & EPOLLERR) != 0;
685       bool read_ev = (ev->events & (EPOLLIN | EPOLLPRI)) != 0;
686       bool write_ev = (ev->events & EPOLLOUT) != 0;
687       bool err_fallback = error && !track_err;
688
689       if (error && !err_fallback) {
690         fd_has_errors(fd);
691       }
692
693       if (read_ev || cancel || err_fallback) {
694         fd_become_readable(fd);
695       }
696
697       if (write_ev || cancel || err_fallback) {
698         fd_become_writable(fd);
699       }
700     }
701   }
702   gpr_atm_rel_store(&g_epoll_set.cursor, cursor);
703   return error;
704 }
705
706 /* Do epoll_wait and store the events in g_epoll_set.events field. This does not
707    "process" any of the events yet; that is done in process_epoll_events().
708    *See process_epoll_events() function for more details.
709
710    NOTE ON SYNCHRONIZATION: At any point of time, only the g_active_poller
711    (i.e the designated poller thread) will be calling this function. So there is
712    no need for any synchronization when accesing fields in g_epoll_set */
713 static grpc_error* do_epoll_wait(grpc_pollset* ps, grpc_millis deadline) {
714   GPR_TIMER_SCOPE("do_epoll_wait", 0);
715
716   int r;
717   int timeout = poll_deadline_to_millis_timeout(deadline);
718   if (timeout != 0) {
719     GRPC_SCHEDULING_START_BLOCKING_REGION;
720   }
721   do {
722     GRPC_STATS_INC_SYSCALL_POLL();
723     r = epoll_wait(g_epoll_set.epfd, g_epoll_set.events, MAX_EPOLL_EVENTS,
724                    timeout);
725   } while (r < 0 && errno == EINTR);
726   if (timeout != 0) {
727     GRPC_SCHEDULING_END_BLOCKING_REGION;
728   }
729
730   if (r < 0) return GRPC_OS_ERROR(errno, "epoll_wait");
731
732   GRPC_STATS_INC_POLL_EVENTS_RETURNED(r);
733
734   if (GRPC_TRACE_FLAG_ENABLED(grpc_polling_trace)) {
735     gpr_log(GPR_INFO, "ps: %p poll got %d events", ps, r);
736   }
737
738   gpr_atm_rel_store(&g_epoll_set.num_events, r);
739   gpr_atm_rel_store(&g_epoll_set.cursor, 0);
740
741   return GRPC_ERROR_NONE;
742 }
743
744 static bool begin_worker(grpc_pollset* pollset, grpc_pollset_worker* worker,
745                          grpc_pollset_worker** worker_hdl,
746                          grpc_millis deadline) {
747   GPR_TIMER_SCOPE("begin_worker", 0);
748   if (worker_hdl != nullptr) *worker_hdl = worker;
749   worker->initialized_cv = false;
750   SET_KICK_STATE(worker, UNKICKED);
751   worker->schedule_on_end_work = (grpc_closure_list)GRPC_CLOSURE_LIST_INIT;
752   pollset->begin_refs++;
753
754   if (GRPC_TRACE_FLAG_ENABLED(grpc_polling_trace)) {
755     gpr_log(GPR_INFO, "PS:%p BEGIN_STARTS:%p", pollset, worker);
756   }
757
758   if (pollset->seen_inactive) {
759     // pollset has been observed to be inactive, we need to move back to the
760     // active list
761     bool is_reassigning = false;
762     if (!pollset->reassigning_neighborhood) {
763       is_reassigning = true;
764       pollset->reassigning_neighborhood = true;
765       pollset->neighborhood = &g_neighborhoods[choose_neighborhood()];
766     }
767     pollset_neighborhood* neighborhood = pollset->neighborhood;
768     gpr_mu_unlock(&pollset->mu);
769   // pollset unlocked: state may change (even worker->kick_state)
770   retry_lock_neighborhood:
771     gpr_mu_lock(&neighborhood->mu);
772     gpr_mu_lock(&pollset->mu);
773     if (GRPC_TRACE_FLAG_ENABLED(grpc_polling_trace)) {
774       gpr_log(GPR_INFO, "PS:%p BEGIN_REORG:%p kick_state=%s is_reassigning=%d",
775               pollset, worker, kick_state_string(worker->state),
776               is_reassigning);
777     }
778     if (pollset->seen_inactive) {
779       if (neighborhood != pollset->neighborhood) {
780         gpr_mu_unlock(&neighborhood->mu);
781         neighborhood = pollset->neighborhood;
782         gpr_mu_unlock(&pollset->mu);
783         goto retry_lock_neighborhood;
784       }
785
786       /* In the brief time we released the pollset locks above, the worker MAY
787          have been kicked. In this case, the worker should get out of this
788          pollset ASAP and hence this should neither add the pollset to
789          neighborhood nor mark the pollset as active.
790
791          On a side note, the only way a worker's kick state could have changed
792          at this point is if it were "kicked specifically". Since the worker has
793          not added itself to the pollset yet (by calling worker_insert()), it is
794          not visible in the "kick any" path yet */
795       if (worker->state == UNKICKED) {
796         pollset->seen_inactive = false;
797         if (neighborhood->active_root == nullptr) {
798           neighborhood->active_root = pollset->next = pollset->prev = pollset;
799           /* Make this the designated poller if there isn't one already */
800           if (worker->state == UNKICKED &&
801               gpr_atm_no_barrier_cas(&g_active_poller, 0, (gpr_atm)worker)) {
802             SET_KICK_STATE(worker, DESIGNATED_POLLER);
803           }
804         } else {
805           pollset->next = neighborhood->active_root;
806           pollset->prev = pollset->next->prev;
807           pollset->next->prev = pollset->prev->next = pollset;
808         }
809       }
810     }
811     if (is_reassigning) {
812       GPR_ASSERT(pollset->reassigning_neighborhood);
813       pollset->reassigning_neighborhood = false;
814     }
815     gpr_mu_unlock(&neighborhood->mu);
816   }
817
818   worker_insert(pollset, worker);
819   pollset->begin_refs--;
820   if (worker->state == UNKICKED && !pollset->kicked_without_poller) {
821     GPR_ASSERT(gpr_atm_no_barrier_load(&g_active_poller) != (gpr_atm)worker);
822     worker->initialized_cv = true;
823     gpr_cv_init(&worker->cv);
824     while (worker->state == UNKICKED && !pollset->shutting_down) {
825       if (GRPC_TRACE_FLAG_ENABLED(grpc_polling_trace)) {
826         gpr_log(GPR_INFO, "PS:%p BEGIN_WAIT:%p kick_state=%s shutdown=%d",
827                 pollset, worker, kick_state_string(worker->state),
828                 pollset->shutting_down);
829       }
830
831       if (gpr_cv_wait(&worker->cv, &pollset->mu,
832                       grpc_millis_to_timespec(deadline, GPR_CLOCK_MONOTONIC)) &&
833           worker->state == UNKICKED) {
834         /* If gpr_cv_wait returns true (i.e a timeout), pretend that the worker
835            received a kick */
836         SET_KICK_STATE(worker, KICKED);
837       }
838     }
839     grpc_core::ExecCtx::Get()->InvalidateNow();
840   }
841
842   if (GRPC_TRACE_FLAG_ENABLED(grpc_polling_trace)) {
843     gpr_log(GPR_INFO,
844             "PS:%p BEGIN_DONE:%p kick_state=%s shutdown=%d "
845             "kicked_without_poller: %d",
846             pollset, worker, kick_state_string(worker->state),
847             pollset->shutting_down, pollset->kicked_without_poller);
848   }
849
850   /* We release pollset lock in this function at a couple of places:
851    *   1. Briefly when assigning pollset to a neighborhood
852    *   2. When doing gpr_cv_wait()
853    * It is possible that 'kicked_without_poller' was set to true during (1) and
854    * 'shutting_down' is set to true during (1) or (2). If either of them is
855    * true, this worker cannot do polling */
856   /* TODO(sreek): Perhaps there is a better way to handle kicked_without_poller
857    * case; especially when the worker is the DESIGNATED_POLLER */
858
859   if (pollset->kicked_without_poller) {
860     pollset->kicked_without_poller = false;
861     return false;
862   }
863
864   return worker->state == DESIGNATED_POLLER && !pollset->shutting_down;
865 }
866
867 static bool check_neighborhood_for_available_poller(
868     pollset_neighborhood* neighborhood) {
869   GPR_TIMER_SCOPE("check_neighborhood_for_available_poller", 0);
870   bool found_worker = false;
871   do {
872     grpc_pollset* inspect = neighborhood->active_root;
873     if (inspect == nullptr) {
874       break;
875     }
876     gpr_mu_lock(&inspect->mu);
877     GPR_ASSERT(!inspect->seen_inactive);
878     grpc_pollset_worker* inspect_worker = inspect->root_worker;
879     if (inspect_worker != nullptr) {
880       do {
881         switch (inspect_worker->state) {
882           case UNKICKED:
883             if (gpr_atm_no_barrier_cas(&g_active_poller, 0,
884                                        (gpr_atm)inspect_worker)) {
885               if (GRPC_TRACE_FLAG_ENABLED(grpc_polling_trace)) {
886                 gpr_log(GPR_INFO, " .. choose next poller to be %p",
887                         inspect_worker);
888               }
889               SET_KICK_STATE(inspect_worker, DESIGNATED_POLLER);
890               if (inspect_worker->initialized_cv) {
891                 GPR_TIMER_MARK("signal worker", 0);
892                 GRPC_STATS_INC_POLLSET_KICK_WAKEUP_CV();
893                 gpr_cv_signal(&inspect_worker->cv);
894               }
895             } else {
896               if (GRPC_TRACE_FLAG_ENABLED(grpc_polling_trace)) {
897                 gpr_log(GPR_INFO, " .. beaten to choose next poller");
898               }
899             }
900             // even if we didn't win the cas, there's a worker, we can stop
901             found_worker = true;
902             break;
903           case KICKED:
904             break;
905           case DESIGNATED_POLLER:
906             found_worker = true;  // ok, so someone else found the worker, but
907                                   // we'll accept that
908             break;
909         }
910         inspect_worker = inspect_worker->next;
911       } while (!found_worker && inspect_worker != inspect->root_worker);
912     }
913     if (!found_worker) {
914       if (GRPC_TRACE_FLAG_ENABLED(grpc_polling_trace)) {
915         gpr_log(GPR_INFO, " .. mark pollset %p inactive", inspect);
916       }
917       inspect->seen_inactive = true;
918       if (inspect == neighborhood->active_root) {
919         neighborhood->active_root =
920             inspect->next == inspect ? nullptr : inspect->next;
921       }
922       inspect->next->prev = inspect->prev;
923       inspect->prev->next = inspect->next;
924       inspect->next = inspect->prev = nullptr;
925     }
926     gpr_mu_unlock(&inspect->mu);
927   } while (!found_worker);
928   return found_worker;
929 }
930
931 static void end_worker(grpc_pollset* pollset, grpc_pollset_worker* worker,
932                        grpc_pollset_worker** worker_hdl) {
933   GPR_TIMER_SCOPE("end_worker", 0);
934   if (GRPC_TRACE_FLAG_ENABLED(grpc_polling_trace)) {
935     gpr_log(GPR_INFO, "PS:%p END_WORKER:%p", pollset, worker);
936   }
937   if (worker_hdl != nullptr) *worker_hdl = nullptr;
938   /* Make sure we appear kicked */
939   SET_KICK_STATE(worker, KICKED);
940   grpc_closure_list_move(&worker->schedule_on_end_work,
941                          grpc_core::ExecCtx::Get()->closure_list());
942   if (gpr_atm_no_barrier_load(&g_active_poller) == (gpr_atm)worker) {
943     if (worker->next != worker && worker->next->state == UNKICKED) {
944       if (GRPC_TRACE_FLAG_ENABLED(grpc_polling_trace)) {
945         gpr_log(GPR_INFO, " .. choose next poller to be peer %p", worker);
946       }
947       GPR_ASSERT(worker->next->initialized_cv);
948       gpr_atm_no_barrier_store(&g_active_poller, (gpr_atm)worker->next);
949       SET_KICK_STATE(worker->next, DESIGNATED_POLLER);
950       GRPC_STATS_INC_POLLSET_KICK_WAKEUP_CV();
951       gpr_cv_signal(&worker->next->cv);
952       if (grpc_core::ExecCtx::Get()->HasWork()) {
953         gpr_mu_unlock(&pollset->mu);
954         grpc_core::ExecCtx::Get()->Flush();
955         gpr_mu_lock(&pollset->mu);
956       }
957     } else {
958       gpr_atm_no_barrier_store(&g_active_poller, 0);
959       size_t poller_neighborhood_idx =
960           static_cast<size_t>(pollset->neighborhood - g_neighborhoods);
961       gpr_mu_unlock(&pollset->mu);
962       bool found_worker = false;
963       bool scan_state[MAX_NEIGHBORHOODS];
964       for (size_t i = 0; !found_worker && i < g_num_neighborhoods; i++) {
965         pollset_neighborhood* neighborhood =
966             &g_neighborhoods[(poller_neighborhood_idx + i) %
967                              g_num_neighborhoods];
968         if (gpr_mu_trylock(&neighborhood->mu)) {
969           found_worker = check_neighborhood_for_available_poller(neighborhood);
970           gpr_mu_unlock(&neighborhood->mu);
971           scan_state[i] = true;
972         } else {
973           scan_state[i] = false;
974         }
975       }
976       for (size_t i = 0; !found_worker && i < g_num_neighborhoods; i++) {
977         if (scan_state[i]) continue;
978         pollset_neighborhood* neighborhood =
979             &g_neighborhoods[(poller_neighborhood_idx + i) %
980                              g_num_neighborhoods];
981         gpr_mu_lock(&neighborhood->mu);
982         found_worker = check_neighborhood_for_available_poller(neighborhood);
983         gpr_mu_unlock(&neighborhood->mu);
984       }
985       grpc_core::ExecCtx::Get()->Flush();
986       gpr_mu_lock(&pollset->mu);
987     }
988   } else if (grpc_core::ExecCtx::Get()->HasWork()) {
989     gpr_mu_unlock(&pollset->mu);
990     grpc_core::ExecCtx::Get()->Flush();
991     gpr_mu_lock(&pollset->mu);
992   }
993   if (worker->initialized_cv) {
994     gpr_cv_destroy(&worker->cv);
995   }
996   if (GRPC_TRACE_FLAG_ENABLED(grpc_polling_trace)) {
997     gpr_log(GPR_INFO, " .. remove worker");
998   }
999   if (EMPTIED == worker_remove(pollset, worker)) {
1000     pollset_maybe_finish_shutdown(pollset);
1001   }
1002   GPR_ASSERT(gpr_atm_no_barrier_load(&g_active_poller) != (gpr_atm)worker);
1003 }
1004
1005 /* pollset->po.mu lock must be held by the caller before calling this.
1006    The function pollset_work() may temporarily release the lock (pollset->po.mu)
1007    during the course of its execution but it will always re-acquire the lock and
1008    ensure that it is held by the time the function returns */
1009 static grpc_error* pollset_work(grpc_pollset* ps,
1010                                 grpc_pollset_worker** worker_hdl,
1011                                 grpc_millis deadline) {
1012   GPR_TIMER_SCOPE("pollset_work", 0);
1013   grpc_pollset_worker worker;
1014   grpc_error* error = GRPC_ERROR_NONE;
1015   static const char* err_desc = "pollset_work";
1016   if (ps->kicked_without_poller) {
1017     ps->kicked_without_poller = false;
1018     return GRPC_ERROR_NONE;
1019   }
1020
1021   if (begin_worker(ps, &worker, worker_hdl, deadline)) {
1022     gpr_tls_set(&g_current_thread_pollset, (intptr_t)ps);
1023     gpr_tls_set(&g_current_thread_worker, (intptr_t)&worker);
1024     GPR_ASSERT(!ps->shutting_down);
1025     GPR_ASSERT(!ps->seen_inactive);
1026
1027     gpr_mu_unlock(&ps->mu); /* unlock */
1028     /* This is the designated polling thread at this point and should ideally do
1029        polling. However, if there are unprocessed events left from a previous
1030        call to do_epoll_wait(), skip calling epoll_wait() in this iteration and
1031        process the pending epoll events.
1032
1033        The reason for decoupling do_epoll_wait and process_epoll_events is to
1034        better distribute the work (i.e handling epoll events) across multiple
1035        threads
1036
1037        process_epoll_events() returns very quickly: It just queues the work on
1038        exec_ctx but does not execute it (the actual exectution or more
1039        accurately grpc_core::ExecCtx::Get()->Flush() happens in end_worker()
1040        AFTER selecting a designated poller). So we are not waiting long periods
1041        without a designated poller */
1042     if (gpr_atm_acq_load(&g_epoll_set.cursor) ==
1043         gpr_atm_acq_load(&g_epoll_set.num_events)) {
1044       append_error(&error, do_epoll_wait(ps, deadline), err_desc);
1045     }
1046     append_error(&error, process_epoll_events(ps), err_desc);
1047
1048     gpr_mu_lock(&ps->mu); /* lock */
1049
1050     gpr_tls_set(&g_current_thread_worker, 0);
1051   } else {
1052     gpr_tls_set(&g_current_thread_pollset, (intptr_t)ps);
1053   }
1054   end_worker(ps, &worker, worker_hdl);
1055
1056   gpr_tls_set(&g_current_thread_pollset, 0);
1057   return error;
1058 }
1059
1060 static grpc_error* pollset_kick(grpc_pollset* pollset,
1061                                 grpc_pollset_worker* specific_worker) {
1062   GPR_TIMER_SCOPE("pollset_kick", 0);
1063   GRPC_STATS_INC_POLLSET_KICK();
1064   grpc_error* ret_err = GRPC_ERROR_NONE;
1065   if (GRPC_TRACE_FLAG_ENABLED(grpc_polling_trace)) {
1066     gpr_strvec log;
1067     gpr_strvec_init(&log);
1068     char* tmp;
1069     gpr_asprintf(&tmp, "PS:%p KICK:%p curps=%p curworker=%p root=%p", pollset,
1070                  specific_worker, (void*)gpr_tls_get(&g_current_thread_pollset),
1071                  (void*)gpr_tls_get(&g_current_thread_worker),
1072                  pollset->root_worker);
1073     gpr_strvec_add(&log, tmp);
1074     if (pollset->root_worker != nullptr) {
1075       gpr_asprintf(&tmp, " {kick_state=%s next=%p {kick_state=%s}}",
1076                    kick_state_string(pollset->root_worker->state),
1077                    pollset->root_worker->next,
1078                    kick_state_string(pollset->root_worker->next->state));
1079       gpr_strvec_add(&log, tmp);
1080     }
1081     if (specific_worker != nullptr) {
1082       gpr_asprintf(&tmp, " worker_kick_state=%s",
1083                    kick_state_string(specific_worker->state));
1084       gpr_strvec_add(&log, tmp);
1085     }
1086     tmp = gpr_strvec_flatten(&log, nullptr);
1087     gpr_strvec_destroy(&log);
1088     gpr_log(GPR_DEBUG, "%s", tmp);
1089     gpr_free(tmp);
1090   }
1091
1092   if (specific_worker == nullptr) {
1093     if (gpr_tls_get(&g_current_thread_pollset) != (intptr_t)pollset) {
1094       grpc_pollset_worker* root_worker = pollset->root_worker;
1095       if (root_worker == nullptr) {
1096         GRPC_STATS_INC_POLLSET_KICKED_WITHOUT_POLLER();
1097         pollset->kicked_without_poller = true;
1098         if (GRPC_TRACE_FLAG_ENABLED(grpc_polling_trace)) {
1099           gpr_log(GPR_INFO, " .. kicked_without_poller");
1100         }
1101         goto done;
1102       }
1103       grpc_pollset_worker* next_worker = root_worker->next;
1104       if (root_worker->state == KICKED) {
1105         GRPC_STATS_INC_POLLSET_KICKED_AGAIN();
1106         if (GRPC_TRACE_FLAG_ENABLED(grpc_polling_trace)) {
1107           gpr_log(GPR_INFO, " .. already kicked %p", root_worker);
1108         }
1109         SET_KICK_STATE(root_worker, KICKED);
1110         goto done;
1111       } else if (next_worker->state == KICKED) {
1112         GRPC_STATS_INC_POLLSET_KICKED_AGAIN();
1113         if (GRPC_TRACE_FLAG_ENABLED(grpc_polling_trace)) {
1114           gpr_log(GPR_INFO, " .. already kicked %p", next_worker);
1115         }
1116         SET_KICK_STATE(next_worker, KICKED);
1117         goto done;
1118       } else if (root_worker ==
1119                      next_worker &&  // only try and wake up a poller if
1120                                      // there is no next worker
1121                  root_worker == (grpc_pollset_worker*)gpr_atm_no_barrier_load(
1122                                     &g_active_poller)) {
1123         GRPC_STATS_INC_POLLSET_KICK_WAKEUP_FD();
1124         if (GRPC_TRACE_FLAG_ENABLED(grpc_polling_trace)) {
1125           gpr_log(GPR_INFO, " .. kicked %p", root_worker);
1126         }
1127         SET_KICK_STATE(root_worker, KICKED);
1128         ret_err = grpc_wakeup_fd_wakeup(&global_wakeup_fd);
1129         goto done;
1130       } else if (next_worker->state == UNKICKED) {
1131         GRPC_STATS_INC_POLLSET_KICK_WAKEUP_CV();
1132         if (GRPC_TRACE_FLAG_ENABLED(grpc_polling_trace)) {
1133           gpr_log(GPR_INFO, " .. kicked %p", next_worker);
1134         }
1135         GPR_ASSERT(next_worker->initialized_cv);
1136         SET_KICK_STATE(next_worker, KICKED);
1137         gpr_cv_signal(&next_worker->cv);
1138         goto done;
1139       } else if (next_worker->state == DESIGNATED_POLLER) {
1140         if (root_worker->state != DESIGNATED_POLLER) {
1141           if (GRPC_TRACE_FLAG_ENABLED(grpc_polling_trace)) {
1142             gpr_log(
1143                 GPR_INFO,
1144                 " .. kicked root non-poller %p (initialized_cv=%d) (poller=%p)",
1145                 root_worker, root_worker->initialized_cv, next_worker);
1146           }
1147           SET_KICK_STATE(root_worker, KICKED);
1148           if (root_worker->initialized_cv) {
1149             GRPC_STATS_INC_POLLSET_KICK_WAKEUP_CV();
1150             gpr_cv_signal(&root_worker->cv);
1151           }
1152           goto done;
1153         } else {
1154           GRPC_STATS_INC_POLLSET_KICK_WAKEUP_FD();
1155           if (GRPC_TRACE_FLAG_ENABLED(grpc_polling_trace)) {
1156             gpr_log(GPR_INFO, " .. non-root poller %p (root=%p)", next_worker,
1157                     root_worker);
1158           }
1159           SET_KICK_STATE(next_worker, KICKED);
1160           ret_err = grpc_wakeup_fd_wakeup(&global_wakeup_fd);
1161           goto done;
1162         }
1163       } else {
1164         GRPC_STATS_INC_POLLSET_KICKED_AGAIN();
1165         GPR_ASSERT(next_worker->state == KICKED);
1166         SET_KICK_STATE(next_worker, KICKED);
1167         goto done;
1168       }
1169     } else {
1170       GRPC_STATS_INC_POLLSET_KICK_OWN_THREAD();
1171       if (GRPC_TRACE_FLAG_ENABLED(grpc_polling_trace)) {
1172         gpr_log(GPR_INFO, " .. kicked while waking up");
1173       }
1174       goto done;
1175     }
1176
1177     GPR_UNREACHABLE_CODE(goto done);
1178   }
1179
1180   if (specific_worker->state == KICKED) {
1181     if (GRPC_TRACE_FLAG_ENABLED(grpc_polling_trace)) {
1182       gpr_log(GPR_INFO, " .. specific worker already kicked");
1183     }
1184     goto done;
1185   } else if (gpr_tls_get(&g_current_thread_worker) ==
1186              (intptr_t)specific_worker) {
1187     GRPC_STATS_INC_POLLSET_KICK_OWN_THREAD();
1188     if (GRPC_TRACE_FLAG_ENABLED(grpc_polling_trace)) {
1189       gpr_log(GPR_INFO, " .. mark %p kicked", specific_worker);
1190     }
1191     SET_KICK_STATE(specific_worker, KICKED);
1192     goto done;
1193   } else if (specific_worker ==
1194              (grpc_pollset_worker*)gpr_atm_no_barrier_load(&g_active_poller)) {
1195     GRPC_STATS_INC_POLLSET_KICK_WAKEUP_FD();
1196     if (GRPC_TRACE_FLAG_ENABLED(grpc_polling_trace)) {
1197       gpr_log(GPR_INFO, " .. kick active poller");
1198     }
1199     SET_KICK_STATE(specific_worker, KICKED);
1200     ret_err = grpc_wakeup_fd_wakeup(&global_wakeup_fd);
1201     goto done;
1202   } else if (specific_worker->initialized_cv) {
1203     GRPC_STATS_INC_POLLSET_KICK_WAKEUP_CV();
1204     if (GRPC_TRACE_FLAG_ENABLED(grpc_polling_trace)) {
1205       gpr_log(GPR_INFO, " .. kick waiting worker");
1206     }
1207     SET_KICK_STATE(specific_worker, KICKED);
1208     gpr_cv_signal(&specific_worker->cv);
1209     goto done;
1210   } else {
1211     GRPC_STATS_INC_POLLSET_KICKED_AGAIN();
1212     if (GRPC_TRACE_FLAG_ENABLED(grpc_polling_trace)) {
1213       gpr_log(GPR_INFO, " .. kick non-waiting worker");
1214     }
1215     SET_KICK_STATE(specific_worker, KICKED);
1216     goto done;
1217   }
1218 done:
1219   return ret_err;
1220 }
1221
1222 static void pollset_add_fd(grpc_pollset* pollset, grpc_fd* fd) {}
1223
1224 /*******************************************************************************
1225  * Pollset-set Definitions
1226  */
1227
1228 static grpc_pollset_set* pollset_set_create(void) {
1229   return (grpc_pollset_set*)(static_cast<intptr_t>(0xdeafbeef));
1230 }
1231
1232 static void pollset_set_destroy(grpc_pollset_set* pss) {}
1233
1234 static void pollset_set_add_fd(grpc_pollset_set* pss, grpc_fd* fd) {}
1235
1236 static void pollset_set_del_fd(grpc_pollset_set* pss, grpc_fd* fd) {}
1237
1238 static void pollset_set_add_pollset(grpc_pollset_set* pss, grpc_pollset* ps) {}
1239
1240 static void pollset_set_del_pollset(grpc_pollset_set* pss, grpc_pollset* ps) {}
1241
1242 static void pollset_set_add_pollset_set(grpc_pollset_set* bag,
1243                                         grpc_pollset_set* item) {}
1244
1245 static void pollset_set_del_pollset_set(grpc_pollset_set* bag,
1246                                         grpc_pollset_set* item) {}
1247
1248 /*******************************************************************************
1249  * Event engine binding
1250  */
1251
1252 static bool is_any_background_poller_thread(void) { return false; }
1253
1254 static void shutdown_background_closure(void) {}
1255
1256 static bool add_closure_to_background_poller(grpc_closure* closure,
1257                                              grpc_error* error) {
1258   return false;
1259 }
1260
1261 static void shutdown_engine(void) {
1262   fd_global_shutdown();
1263   pollset_global_shutdown();
1264   epoll_set_shutdown();
1265   if (grpc_core::Fork::Enabled()) {
1266     gpr_mu_destroy(&fork_fd_list_mu);
1267     grpc_core::Fork::SetResetChildPollingEngineFunc(nullptr);
1268   }
1269 }
1270
1271 static const grpc_event_engine_vtable vtable = {
1272     sizeof(grpc_pollset),
1273     true,
1274     false,
1275
1276     fd_create,
1277     fd_wrapped_fd,
1278     fd_orphan,
1279     fd_shutdown,
1280     fd_notify_on_read,
1281     fd_notify_on_write,
1282     fd_notify_on_error,
1283     fd_become_readable,
1284     fd_become_writable,
1285     fd_has_errors,
1286     fd_is_shutdown,
1287
1288     pollset_init,
1289     pollset_shutdown,
1290     pollset_destroy,
1291     pollset_work,
1292     pollset_kick,
1293     pollset_add_fd,
1294
1295     pollset_set_create,
1296     pollset_set_destroy,
1297     pollset_set_add_pollset,
1298     pollset_set_del_pollset,
1299     pollset_set_add_pollset_set,
1300     pollset_set_del_pollset_set,
1301     pollset_set_add_fd,
1302     pollset_set_del_fd,
1303
1304     is_any_background_poller_thread,
1305     shutdown_background_closure,
1306     shutdown_engine,
1307     add_closure_to_background_poller,
1308 };
1309
1310 /* Called by the child process's post-fork handler to close open fds, including
1311  * the global epoll fd. This allows gRPC to shutdown in the child process
1312  * without interfering with connections or RPCs ongoing in the parent. */
1313 static void reset_event_manager_on_fork() {
1314   gpr_mu_lock(&fork_fd_list_mu);
1315   while (fork_fd_list_head != nullptr) {
1316     close(fork_fd_list_head->fd);
1317     fork_fd_list_head->fd = -1;
1318     fork_fd_list_head = fork_fd_list_head->fork_fd_list->next;
1319   }
1320   gpr_mu_unlock(&fork_fd_list_mu);
1321   shutdown_engine();
1322   grpc_init_epoll1_linux(true);
1323 }
1324
1325 /* It is possible that GLIBC has epoll but the underlying kernel doesn't.
1326  * Create epoll_fd (epoll_set_init() takes care of that) to make sure epoll
1327  * support is available */
1328 const grpc_event_engine_vtable* grpc_init_epoll1_linux(bool explicit_request) {
1329   if (!grpc_has_wakeup_fd()) {
1330     gpr_log(GPR_ERROR, "Skipping epoll1 because of no wakeup fd.");
1331     return nullptr;
1332   }
1333
1334   if (!epoll_set_init()) {
1335     return nullptr;
1336   }
1337
1338   fd_global_init();
1339
1340   if (!GRPC_LOG_IF_ERROR("pollset_global_init", pollset_global_init())) {
1341     fd_global_shutdown();
1342     epoll_set_shutdown();
1343     return nullptr;
1344   }
1345
1346   if (grpc_core::Fork::Enabled()) {
1347     gpr_mu_init(&fork_fd_list_mu);
1348     grpc_core::Fork::SetResetChildPollingEngineFunc(
1349         reset_event_manager_on_fork);
1350   }
1351   return &vtable;
1352 }
1353
1354 #else /* defined(GRPC_LINUX_EPOLL) */
1355 #if defined(GRPC_POSIX_SOCKET_EV_EPOLL1)
1356 #include "src/core/lib/iomgr/ev_epoll1_linux.h"
1357 /* If GRPC_LINUX_EPOLL is not defined, it means epoll is not available. Return
1358  * NULL */
1359 const grpc_event_engine_vtable* grpc_init_epoll1_linux(bool explicit_request) {
1360   return nullptr;
1361 }
1362 #endif /* defined(GRPC_POSIX_SOCKET_EV_EPOLL1) */
1363 #endif /* !defined(GRPC_LINUX_EPOLL) */