Built motion from commit 6a09e18b.|2.6.11
[motion2.git] / legacy-libs / grpc / deps / grpc / src / core / lib / iomgr / executor.cc
1 /*
2  *
3  * Copyright 2015 gRPC authors.
4  *
5  * Licensed under the Apache License, Version 2.0 (the "License");
6  * you may not use this file except in compliance with the License.
7  * You may obtain a copy of the License at
8  *
9  *     http://www.apache.org/licenses/LICENSE-2.0
10  *
11  * Unless required by applicable law or agreed to in writing, software
12  * distributed under the License is distributed on an "AS IS" BASIS,
13  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14  * See the License for the specific language governing permissions and
15  * limitations under the License.
16  *
17  */
18
19 #include <grpc/support/port_platform.h>
20
21 #include "src/core/lib/iomgr/executor.h"
22
23 #include <string.h>
24
25 #include <grpc/support/alloc.h>
26 #include <grpc/support/cpu.h>
27 #include <grpc/support/log.h>
28 #include <grpc/support/sync.h>
29
30 #include "src/core/lib/debug/stats.h"
31 #include "src/core/lib/gpr/tls.h"
32 #include "src/core/lib/gpr/useful.h"
33 #include "src/core/lib/gprpp/memory.h"
34 #include "src/core/lib/iomgr/exec_ctx.h"
35 #include "src/core/lib/iomgr/iomgr.h"
36
37 #define MAX_DEPTH 2
38
39 #define EXECUTOR_TRACE(format, ...)                       \
40   do {                                                    \
41     if (GRPC_TRACE_FLAG_ENABLED(executor_trace)) {        \
42       gpr_log(GPR_INFO, "EXECUTOR " format, __VA_ARGS__); \
43     }                                                     \
44   } while (0)
45
46 #define EXECUTOR_TRACE0(str)                       \
47   do {                                             \
48     if (GRPC_TRACE_FLAG_ENABLED(executor_trace)) { \
49       gpr_log(GPR_INFO, "EXECUTOR " str);          \
50     }                                              \
51   } while (0)
52
53 namespace grpc_core {
54 namespace {
55
56 GPR_TLS_DECL(g_this_thread_state);
57
58 Executor* executors[static_cast<size_t>(ExecutorType::NUM_EXECUTORS)];
59
60 void default_enqueue_short(grpc_closure* closure, grpc_error* error) {
61   executors[static_cast<size_t>(ExecutorType::DEFAULT)]->Enqueue(
62       closure, error, true /* is_short */);
63 }
64
65 void default_enqueue_long(grpc_closure* closure, grpc_error* error) {
66   executors[static_cast<size_t>(ExecutorType::DEFAULT)]->Enqueue(
67       closure, error, false /* is_short */);
68 }
69
70 void resolver_enqueue_short(grpc_closure* closure, grpc_error* error) {
71   executors[static_cast<size_t>(ExecutorType::RESOLVER)]->Enqueue(
72       closure, error, true /* is_short */);
73 }
74
75 void resolver_enqueue_long(grpc_closure* closure, grpc_error* error) {
76   executors[static_cast<size_t>(ExecutorType::RESOLVER)]->Enqueue(
77       closure, error, false /* is_short */);
78 }
79
80 const grpc_closure_scheduler_vtable
81     vtables_[static_cast<size_t>(ExecutorType::NUM_EXECUTORS)]
82             [static_cast<size_t>(ExecutorJobType::NUM_JOB_TYPES)] = {
83                 {{&default_enqueue_short, &default_enqueue_short,
84                   "def-ex-short"},
85                  {&default_enqueue_long, &default_enqueue_long, "def-ex-long"}},
86                 {{&resolver_enqueue_short, &resolver_enqueue_short,
87                   "res-ex-short"},
88                  {&resolver_enqueue_long, &resolver_enqueue_long,
89                   "res-ex-long"}}};
90
91 grpc_closure_scheduler
92     schedulers_[static_cast<size_t>(ExecutorType::NUM_EXECUTORS)]
93                [static_cast<size_t>(ExecutorJobType::NUM_JOB_TYPES)] = {
94                    {{&vtables_[static_cast<size_t>(ExecutorType::DEFAULT)]
95                               [static_cast<size_t>(ExecutorJobType::SHORT)]},
96                     {&vtables_[static_cast<size_t>(ExecutorType::DEFAULT)]
97                               [static_cast<size_t>(ExecutorJobType::LONG)]}},
98                    {{&vtables_[static_cast<size_t>(ExecutorType::RESOLVER)]
99                               [static_cast<size_t>(ExecutorJobType::SHORT)]},
100                     {&vtables_[static_cast<size_t>(ExecutorType::RESOLVER)]
101                               [static_cast<size_t>(ExecutorJobType::LONG)]}}};
102
103 }  // namespace
104
105 TraceFlag executor_trace(false, "executor");
106
107 Executor::Executor(const char* name) : name_(name) {
108   adding_thread_lock_ = GPR_SPINLOCK_STATIC_INITIALIZER;
109   gpr_atm_rel_store(&num_threads_, 0);
110   max_threads_ = GPR_MAX(1, 2 * gpr_cpu_num_cores());
111 }
112
113 void Executor::Init() { SetThreading(true); }
114
115 size_t Executor::RunClosures(const char* executor_name,
116                              grpc_closure_list list) {
117   size_t n = 0;
118
119   // In the executor, the ExecCtx for the thread is declared in the executor
120   // thread itself, but this is the point where we could start seeing
121   // application-level callbacks. No need to create a new ExecCtx, though,
122   // since there already is one and it is flushed (but not destructed) in this
123   // function itself. The ApplicationCallbackExecCtx will have its callbacks
124   // invoked on its destruction, which will be after completing any closures in
125   // the executor's closure list (which were explicitly scheduled onto the
126   // executor).
127   grpc_core::ApplicationCallbackExecCtx callback_exec_ctx(
128       GRPC_APP_CALLBACK_EXEC_CTX_FLAG_IS_INTERNAL_THREAD);
129
130   grpc_closure* c = list.head;
131   while (c != nullptr) {
132     grpc_closure* next = c->next_data.next;
133     grpc_error* error = c->error_data.error;
134 #ifndef NDEBUG
135     EXECUTOR_TRACE("(%s) run %p [created by %s:%d]", executor_name, c,
136                    c->file_created, c->line_created);
137     c->scheduled = false;
138 #else
139     EXECUTOR_TRACE("(%s) run %p", executor_name, c);
140 #endif
141     c->cb(c->cb_arg, error);
142     GRPC_ERROR_UNREF(error);
143     c = next;
144     n++;
145     grpc_core::ExecCtx::Get()->Flush();
146   }
147
148   return n;
149 }
150
151 bool Executor::IsThreaded() const {
152   return gpr_atm_acq_load(&num_threads_) > 0;
153 }
154
155 void Executor::SetThreading(bool threading) {
156   gpr_atm curr_num_threads = gpr_atm_acq_load(&num_threads_);
157   EXECUTOR_TRACE("(%s) SetThreading(%d) begin", name_, threading);
158
159   if (threading) {
160     if (curr_num_threads > 0) {
161       EXECUTOR_TRACE("(%s) SetThreading(true). curr_num_threads == 0", name_);
162       return;
163     }
164
165     GPR_ASSERT(num_threads_ == 0);
166     gpr_atm_rel_store(&num_threads_, 1);
167     gpr_tls_init(&g_this_thread_state);
168     thd_state_ = static_cast<ThreadState*>(
169         gpr_zalloc(sizeof(ThreadState) * max_threads_));
170
171     for (size_t i = 0; i < max_threads_; i++) {
172       gpr_mu_init(&thd_state_[i].mu);
173       gpr_cv_init(&thd_state_[i].cv);
174       thd_state_[i].id = i;
175       thd_state_[i].name = name_;
176       thd_state_[i].thd = grpc_core::Thread();
177       thd_state_[i].elems = GRPC_CLOSURE_LIST_INIT;
178     }
179
180     thd_state_[0].thd =
181         grpc_core::Thread(name_, &Executor::ThreadMain, &thd_state_[0]);
182     thd_state_[0].thd.Start();
183   } else {  // !threading
184     if (curr_num_threads == 0) {
185       EXECUTOR_TRACE("(%s) SetThreading(false). curr_num_threads == 0", name_);
186       return;
187     }
188
189     for (size_t i = 0; i < max_threads_; i++) {
190       gpr_mu_lock(&thd_state_[i].mu);
191       thd_state_[i].shutdown = true;
192       gpr_cv_signal(&thd_state_[i].cv);
193       gpr_mu_unlock(&thd_state_[i].mu);
194     }
195
196     /* Ensure no thread is adding a new thread. Once this is past, then no
197      * thread will try to add a new one either (since shutdown is true) */
198     gpr_spinlock_lock(&adding_thread_lock_);
199     gpr_spinlock_unlock(&adding_thread_lock_);
200
201     curr_num_threads = gpr_atm_no_barrier_load(&num_threads_);
202     for (gpr_atm i = 0; i < curr_num_threads; i++) {
203       thd_state_[i].thd.Join();
204       EXECUTOR_TRACE("(%s) Thread %" PRIdPTR " of %" PRIdPTR " joined", name_,
205                      i + 1, curr_num_threads);
206     }
207
208     gpr_atm_rel_store(&num_threads_, 0);
209     for (size_t i = 0; i < max_threads_; i++) {
210       gpr_mu_destroy(&thd_state_[i].mu);
211       gpr_cv_destroy(&thd_state_[i].cv);
212       RunClosures(thd_state_[i].name, thd_state_[i].elems);
213     }
214
215     gpr_free(thd_state_);
216     gpr_tls_destroy(&g_this_thread_state);
217
218     // grpc_iomgr_shutdown_background_closure() will close all the registered
219     // fds in the background poller, and wait for all pending closures to
220     // finish. Thus, never call Executor::SetThreading(false) in the middle of
221     // an application.
222     // TODO(guantaol): create another method to finish all the pending closures
223     // registered in the background poller by grpc_core::Executor.
224     grpc_iomgr_shutdown_background_closure();
225   }
226
227   EXECUTOR_TRACE("(%s) SetThreading(%d) done", name_, threading);
228 }
229
230 void Executor::Shutdown() { SetThreading(false); }
231
232 void Executor::ThreadMain(void* arg) {
233   ThreadState* ts = static_cast<ThreadState*>(arg);
234   gpr_tls_set(&g_this_thread_state, reinterpret_cast<intptr_t>(ts));
235
236   grpc_core::ExecCtx exec_ctx(GRPC_EXEC_CTX_FLAG_IS_INTERNAL_THREAD);
237
238   size_t subtract_depth = 0;
239   for (;;) {
240     EXECUTOR_TRACE("(%s) [%" PRIdPTR "]: step (sub_depth=%" PRIdPTR ")",
241                    ts->name, ts->id, subtract_depth);
242
243     gpr_mu_lock(&ts->mu);
244     ts->depth -= subtract_depth;
245     // Wait for closures to be enqueued or for the executor to be shutdown
246     while (grpc_closure_list_empty(ts->elems) && !ts->shutdown) {
247       ts->queued_long_job = false;
248       gpr_cv_wait(&ts->cv, &ts->mu, gpr_inf_future(GPR_CLOCK_MONOTONIC));
249     }
250
251     if (ts->shutdown) {
252       EXECUTOR_TRACE("(%s) [%" PRIdPTR "]: shutdown", ts->name, ts->id);
253       gpr_mu_unlock(&ts->mu);
254       break;
255     }
256
257     GRPC_STATS_INC_EXECUTOR_QUEUE_DRAINED();
258     grpc_closure_list closures = ts->elems;
259     ts->elems = GRPC_CLOSURE_LIST_INIT;
260     gpr_mu_unlock(&ts->mu);
261
262     EXECUTOR_TRACE("(%s) [%" PRIdPTR "]: execute", ts->name, ts->id);
263
264     grpc_core::ExecCtx::Get()->InvalidateNow();
265     subtract_depth = RunClosures(ts->name, closures);
266   }
267 }
268
269 void Executor::Enqueue(grpc_closure* closure, grpc_error* error,
270                        bool is_short) {
271   bool retry_push;
272   if (is_short) {
273     GRPC_STATS_INC_EXECUTOR_SCHEDULED_SHORT_ITEMS();
274   } else {
275     GRPC_STATS_INC_EXECUTOR_SCHEDULED_LONG_ITEMS();
276   }
277
278   do {
279     retry_push = false;
280     size_t cur_thread_count =
281         static_cast<size_t>(gpr_atm_acq_load(&num_threads_));
282
283     // If the number of threads is zero(i.e either the executor is not threaded
284     // or already shutdown), then queue the closure on the exec context itself
285     if (cur_thread_count == 0) {
286 #ifndef NDEBUG
287       EXECUTOR_TRACE("(%s) schedule %p (created %s:%d) inline", name_, closure,
288                      closure->file_created, closure->line_created);
289 #else
290       EXECUTOR_TRACE("(%s) schedule %p inline", name_, closure);
291 #endif
292       grpc_closure_list_append(grpc_core::ExecCtx::Get()->closure_list(),
293                                closure, error);
294       return;
295     }
296
297     if (grpc_iomgr_add_closure_to_background_poller(closure, error)) {
298       return;
299     }
300
301     ThreadState* ts = (ThreadState*)gpr_tls_get(&g_this_thread_state);
302     if (ts == nullptr) {
303       ts = &thd_state_[GPR_HASH_POINTER(grpc_core::ExecCtx::Get(),
304                                         cur_thread_count)];
305     } else {
306       GRPC_STATS_INC_EXECUTOR_SCHEDULED_TO_SELF();
307     }
308
309     ThreadState* orig_ts = ts;
310     bool try_new_thread = false;
311
312     for (;;) {
313 #ifndef NDEBUG
314       EXECUTOR_TRACE(
315           "(%s) try to schedule %p (%s) (created %s:%d) to thread "
316           "%" PRIdPTR,
317           name_, closure, is_short ? "short" : "long", closure->file_created,
318           closure->line_created, ts->id);
319 #else
320       EXECUTOR_TRACE("(%s) try to schedule %p (%s) to thread %" PRIdPTR, name_,
321                      closure, is_short ? "short" : "long", ts->id);
322 #endif
323
324       gpr_mu_lock(&ts->mu);
325       if (ts->queued_long_job) {
326         // if there's a long job queued, we never queue anything else to this
327         // queue (since long jobs can take 'infinite' time and we need to
328         // guarantee no starvation). Spin through queues and try again
329         gpr_mu_unlock(&ts->mu);
330         size_t idx = ts->id;
331         ts = &thd_state_[(idx + 1) % cur_thread_count];
332         if (ts == orig_ts) {
333           // We cycled through all the threads. Retry enqueue again by creating
334           // a new thread
335           //
336           // TODO (sreek): There is a potential issue here. We are
337           // unconditionally setting try_new_thread to true here. What if the
338           // executor is shutdown OR if cur_thread_count is already equal to
339           // max_threads ?
340           // (Fortunately, this is not an issue yet (as of july 2018) because
341           // there is only one instance of long job in gRPC and hence we will
342           // not hit this code path)
343           retry_push = true;
344           try_new_thread = true;
345           break;
346         }
347
348         continue;  // Try the next thread-state
349       }
350
351       // == Found the thread state (i.e thread) to enqueue this closure! ==
352
353       // Also, if this thread has been waiting for closures, wake it up.
354       // - If grpc_closure_list_empty() is true and the Executor is not
355       //   shutdown, it means that the thread must be waiting in ThreadMain()
356       // - Note that gpr_cv_signal() won't immediately wakeup the thread. That
357       //   happens after we release the mutex &ts->mu a few lines below
358       if (grpc_closure_list_empty(ts->elems) && !ts->shutdown) {
359         GRPC_STATS_INC_EXECUTOR_WAKEUP_INITIATED();
360         gpr_cv_signal(&ts->cv);
361       }
362
363       grpc_closure_list_append(&ts->elems, closure, error);
364
365       // If we already queued more than MAX_DEPTH number of closures on this
366       // thread, use this as a hint to create more threads
367       ts->depth++;
368       try_new_thread = ts->depth > MAX_DEPTH &&
369                        cur_thread_count < max_threads_ && !ts->shutdown;
370
371       ts->queued_long_job = !is_short;
372
373       gpr_mu_unlock(&ts->mu);
374       break;
375     }
376
377     if (try_new_thread && gpr_spinlock_trylock(&adding_thread_lock_)) {
378       cur_thread_count = static_cast<size_t>(gpr_atm_acq_load(&num_threads_));
379       if (cur_thread_count < max_threads_) {
380         // Increment num_threads (safe to do a store instead of a cas because we
381         // always increment num_threads under the 'adding_thread_lock')
382         gpr_atm_rel_store(&num_threads_, cur_thread_count + 1);
383
384         thd_state_[cur_thread_count].thd = grpc_core::Thread(
385             name_, &Executor::ThreadMain, &thd_state_[cur_thread_count]);
386         thd_state_[cur_thread_count].thd.Start();
387       }
388       gpr_spinlock_unlock(&adding_thread_lock_);
389     }
390
391     if (retry_push) {
392       GRPC_STATS_INC_EXECUTOR_PUSH_RETRIES();
393     }
394   } while (retry_push);
395 }
396
397 // Executor::InitAll() and Executor::ShutdownAll() functions are called in the
398 // the grpc_init() and grpc_shutdown() code paths which are protected by a
399 // global mutex. So it is okay to assume that these functions are thread-safe
400 void Executor::InitAll() {
401   EXECUTOR_TRACE0("Executor::InitAll() enter");
402
403   // Return if Executor::InitAll() is already called earlier
404   if (executors[static_cast<size_t>(ExecutorType::DEFAULT)] != nullptr) {
405     GPR_ASSERT(executors[static_cast<size_t>(ExecutorType::RESOLVER)] !=
406                nullptr);
407     return;
408   }
409
410   executors[static_cast<size_t>(ExecutorType::DEFAULT)] =
411       grpc_core::New<Executor>("default-executor");
412   executors[static_cast<size_t>(ExecutorType::RESOLVER)] =
413       grpc_core::New<Executor>("resolver-executor");
414
415   executors[static_cast<size_t>(ExecutorType::DEFAULT)]->Init();
416   executors[static_cast<size_t>(ExecutorType::RESOLVER)]->Init();
417
418   EXECUTOR_TRACE0("Executor::InitAll() done");
419 }
420
421 grpc_closure_scheduler* Executor::Scheduler(ExecutorType executor_type,
422                                             ExecutorJobType job_type) {
423   return &schedulers_[static_cast<size_t>(executor_type)]
424                      [static_cast<size_t>(job_type)];
425 }
426
427 grpc_closure_scheduler* Executor::Scheduler(ExecutorJobType job_type) {
428   return Executor::Scheduler(ExecutorType::DEFAULT, job_type);
429 }
430
431 void Executor::ShutdownAll() {
432   EXECUTOR_TRACE0("Executor::ShutdownAll() enter");
433
434   // Return if Executor:SshutdownAll() is already called earlier
435   if (executors[static_cast<size_t>(ExecutorType::DEFAULT)] == nullptr) {
436     GPR_ASSERT(executors[static_cast<size_t>(ExecutorType::RESOLVER)] ==
437                nullptr);
438     return;
439   }
440
441   executors[static_cast<size_t>(ExecutorType::DEFAULT)]->Shutdown();
442   executors[static_cast<size_t>(ExecutorType::RESOLVER)]->Shutdown();
443
444   // Delete the executor objects.
445   //
446   // NOTE: It is important to call Shutdown() on all executors first before
447   // calling Delete() because it is possible for one executor (that is not
448   // shutdown yet) to call Enqueue() on a different executor which is already
449   // shutdown. This is legal and in such cases, the Enqueue() operation
450   // effectively "fails" and enqueues that closure on the calling thread's
451   // exec_ctx.
452   //
453   // By ensuring that all executors are shutdown first, we are also ensuring
454   // that no thread is active across all executors.
455
456   grpc_core::Delete<Executor>(
457       executors[static_cast<size_t>(ExecutorType::DEFAULT)]);
458   grpc_core::Delete<Executor>(
459       executors[static_cast<size_t>(ExecutorType::RESOLVER)]);
460   executors[static_cast<size_t>(ExecutorType::DEFAULT)] = nullptr;
461   executors[static_cast<size_t>(ExecutorType::RESOLVER)] = nullptr;
462
463   EXECUTOR_TRACE0("Executor::ShutdownAll() done");
464 }
465
466 bool Executor::IsThreaded(ExecutorType executor_type) {
467   GPR_ASSERT(executor_type < ExecutorType::NUM_EXECUTORS);
468   return executors[static_cast<size_t>(executor_type)]->IsThreaded();
469 }
470
471 bool Executor::IsThreadedDefault() {
472   return Executor::IsThreaded(ExecutorType::DEFAULT);
473 }
474
475 void Executor::SetThreadingAll(bool enable) {
476   EXECUTOR_TRACE("Executor::SetThreadingAll(%d) called", enable);
477   for (size_t i = 0; i < static_cast<size_t>(ExecutorType::NUM_EXECUTORS);
478        i++) {
479     executors[i]->SetThreading(enable);
480   }
481 }
482
483 void Executor::SetThreadingDefault(bool enable) {
484   EXECUTOR_TRACE("Executor::SetThreadingDefault(%d) called", enable);
485   executors[static_cast<size_t>(ExecutorType::DEFAULT)]->SetThreading(enable);
486 }
487
488 }  // namespace grpc_core