Built motion from commit 6a09e18b.|2.6.11
[motion2.git] / legacy-libs / grpc / deps / grpc / src / core / ext / filters / client_channel / resolver / dns / c_ares / grpc_ares_ev_driver.cc
1 /*
2  *
3  * Copyright 2016 gRPC authors.
4  *
5  * Licensed under the Apache License, Version 2.0 (the "License");
6  * you may not use this file except in compliance with the License.
7  * You may obtain a copy of the License at
8  *
9  *     http://www.apache.org/licenses/LICENSE-2.0
10  *
11  * Unless required by applicable law or agreed to in writing, software
12  * distributed under the License is distributed on an "AS IS" BASIS,
13  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14  * See the License for the specific language governing permissions and
15  * limitations under the License.
16  *
17  */
18 #include <grpc/support/port_platform.h>
19
20 #include "src/core/lib/iomgr/port.h"
21 #if GRPC_ARES == 1
22
23 #include <ares.h>
24 #include <string.h>
25
26 #include "src/core/ext/filters/client_channel/resolver/dns/c_ares/grpc_ares_ev_driver.h"
27
28 #include <grpc/support/alloc.h>
29 #include <grpc/support/log.h>
30 #include <grpc/support/string_util.h>
31 #include <grpc/support/time.h>
32 #include "src/core/ext/filters/client_channel/resolver/dns/c_ares/grpc_ares_wrapper.h"
33 #include "src/core/lib/gpr/string.h"
34 #include "src/core/lib/iomgr/combiner.h"
35 #include "src/core/lib/iomgr/iomgr_internal.h"
36 #include "src/core/lib/iomgr/sockaddr_utils.h"
37 #include "src/core/lib/iomgr/timer.h"
38
39 typedef struct fd_node {
40   /** the owner of this fd node */
41   grpc_ares_ev_driver* ev_driver;
42   /** a closure wrapping on_readable_locked, which should be
43      invoked when the grpc_fd in this node becomes readable. */
44   grpc_closure read_closure;
45   /** a closure wrapping on_writable_locked, which should be
46      invoked when the grpc_fd in this node becomes writable. */
47   grpc_closure write_closure;
48   /** next fd node in the list */
49   struct fd_node* next;
50
51   /** wrapped fd that's polled by grpc's poller for the current platform */
52   grpc_core::GrpcPolledFd* grpc_polled_fd;
53   /** if the readable closure has been registered */
54   bool readable_registered;
55   /** if the writable closure has been registered */
56   bool writable_registered;
57   /** if the fd has been shutdown yet from grpc iomgr perspective */
58   bool already_shutdown;
59 } fd_node;
60
61 struct grpc_ares_ev_driver {
62   /** the ares_channel owned by this event driver */
63   ares_channel channel;
64   /** pollset set for driving the IO events of the channel */
65   grpc_pollset_set* pollset_set;
66   /** refcount of the event driver */
67   gpr_refcount refs;
68
69   /** combiner to synchronize c-ares and I/O callbacks on */
70   grpc_combiner* combiner;
71   /** a list of grpc_fd that this event driver is currently using. */
72   fd_node* fds;
73   /** is this event driver currently working? */
74   bool working;
75   /** is this event driver being shut down */
76   bool shutting_down;
77   /** request object that's using this ev driver */
78   grpc_ares_request* request;
79   /** Owned by the ev_driver. Creates new GrpcPolledFd's */
80   grpc_core::UniquePtr<grpc_core::GrpcPolledFdFactory> polled_fd_factory;
81   /** query timeout in milliseconds */
82   int query_timeout_ms;
83   /** alarm to cancel active queries */
84   grpc_timer query_timeout;
85   /** cancels queries on a timeout */
86   grpc_closure on_timeout_locked;
87   /** alarm to poll ares_process on in case fd events don't happen */
88   grpc_timer ares_backup_poll_alarm;
89   /** polls ares_process on a periodic timer */
90   grpc_closure on_ares_backup_poll_alarm_locked;
91 };
92
93 static void grpc_ares_notify_on_event_locked(grpc_ares_ev_driver* ev_driver);
94
95 static grpc_ares_ev_driver* grpc_ares_ev_driver_ref(
96     grpc_ares_ev_driver* ev_driver) {
97   GRPC_CARES_TRACE_LOG("request:%p Ref ev_driver %p", ev_driver->request,
98                        ev_driver);
99   gpr_ref(&ev_driver->refs);
100   return ev_driver;
101 }
102
103 static void grpc_ares_ev_driver_unref(grpc_ares_ev_driver* ev_driver) {
104   GRPC_CARES_TRACE_LOG("request:%p Unref ev_driver %p", ev_driver->request,
105                        ev_driver);
106   if (gpr_unref(&ev_driver->refs)) {
107     GRPC_CARES_TRACE_LOG("request:%p destroy ev_driver %p", ev_driver->request,
108                          ev_driver);
109     GPR_ASSERT(ev_driver->fds == nullptr);
110     GRPC_COMBINER_UNREF(ev_driver->combiner, "free ares event driver");
111     ares_destroy(ev_driver->channel);
112     grpc_ares_complete_request_locked(ev_driver->request);
113     grpc_core::Delete(ev_driver);
114   }
115 }
116
117 static void fd_node_destroy_locked(fd_node* fdn) {
118   GRPC_CARES_TRACE_LOG("request:%p delete fd: %s", fdn->ev_driver->request,
119                        fdn->grpc_polled_fd->GetName());
120   GPR_ASSERT(!fdn->readable_registered);
121   GPR_ASSERT(!fdn->writable_registered);
122   GPR_ASSERT(fdn->already_shutdown);
123   grpc_core::Delete(fdn->grpc_polled_fd);
124   gpr_free(fdn);
125 }
126
127 static void fd_node_shutdown_locked(fd_node* fdn, const char* reason) {
128   if (!fdn->already_shutdown) {
129     fdn->already_shutdown = true;
130     fdn->grpc_polled_fd->ShutdownLocked(
131         GRPC_ERROR_CREATE_FROM_STATIC_STRING(reason));
132   }
133 }
134
135 static void on_timeout_locked(void* arg, grpc_error* error);
136
137 static void on_ares_backup_poll_alarm_locked(void* arg, grpc_error* error);
138
139 static void noop_inject_channel_config(ares_channel channel) {}
140
141 void (*grpc_ares_test_only_inject_config)(ares_channel channel) =
142     noop_inject_channel_config;
143
144 grpc_error* grpc_ares_ev_driver_create_locked(grpc_ares_ev_driver** ev_driver,
145                                               grpc_pollset_set* pollset_set,
146                                               int query_timeout_ms,
147                                               grpc_combiner* combiner,
148                                               grpc_ares_request* request) {
149   *ev_driver = grpc_core::New<grpc_ares_ev_driver>();
150   ares_options opts;
151   memset(&opts, 0, sizeof(opts));
152   opts.flags |= ARES_FLAG_STAYOPEN;
153   int status = ares_init_options(&(*ev_driver)->channel, &opts, ARES_OPT_FLAGS);
154   grpc_ares_test_only_inject_config((*ev_driver)->channel);
155   GRPC_CARES_TRACE_LOG("request:%p grpc_ares_ev_driver_create_locked", request);
156   if (status != ARES_SUCCESS) {
157     char* err_msg;
158     gpr_asprintf(&err_msg, "Failed to init ares channel. C-ares error: %s",
159                  ares_strerror(status));
160     grpc_error* err = GRPC_ERROR_CREATE_FROM_COPIED_STRING(err_msg);
161     gpr_free(err_msg);
162     gpr_free(*ev_driver);
163     return err;
164   }
165   (*ev_driver)->combiner = GRPC_COMBINER_REF(combiner, "ares event driver");
166   gpr_ref_init(&(*ev_driver)->refs, 1);
167   (*ev_driver)->pollset_set = pollset_set;
168   (*ev_driver)->fds = nullptr;
169   (*ev_driver)->working = false;
170   (*ev_driver)->shutting_down = false;
171   (*ev_driver)->request = request;
172   (*ev_driver)->polled_fd_factory =
173       grpc_core::NewGrpcPolledFdFactory((*ev_driver)->combiner);
174   (*ev_driver)
175       ->polled_fd_factory->ConfigureAresChannelLocked((*ev_driver)->channel);
176   GRPC_CLOSURE_INIT(&(*ev_driver)->on_timeout_locked, on_timeout_locked,
177                     *ev_driver, grpc_combiner_scheduler(combiner));
178   GRPC_CLOSURE_INIT(&(*ev_driver)->on_ares_backup_poll_alarm_locked,
179                     on_ares_backup_poll_alarm_locked, *ev_driver,
180                     grpc_combiner_scheduler(combiner));
181   (*ev_driver)->query_timeout_ms = query_timeout_ms;
182   return GRPC_ERROR_NONE;
183 }
184
185 void grpc_ares_ev_driver_on_queries_complete_locked(
186     grpc_ares_ev_driver* ev_driver) {
187   // We mark the event driver as being shut down. If the event driver
188   // is working, grpc_ares_notify_on_event_locked will shut down the
189   // fds; if it's not working, there are no fds to shut down.
190   ev_driver->shutting_down = true;
191   grpc_timer_cancel(&ev_driver->query_timeout);
192   grpc_timer_cancel(&ev_driver->ares_backup_poll_alarm);
193   grpc_ares_ev_driver_unref(ev_driver);
194 }
195
196 void grpc_ares_ev_driver_shutdown_locked(grpc_ares_ev_driver* ev_driver) {
197   ev_driver->shutting_down = true;
198   fd_node* fn = ev_driver->fds;
199   while (fn != nullptr) {
200     fd_node_shutdown_locked(fn, "grpc_ares_ev_driver_shutdown");
201     fn = fn->next;
202   }
203 }
204
205 // Search fd in the fd_node list head. This is an O(n) search, the max possible
206 // value of n is ARES_GETSOCK_MAXNUM (16). n is typically 1 - 2 in our tests.
207 static fd_node* pop_fd_node_locked(fd_node** head, ares_socket_t as) {
208   fd_node dummy_head;
209   dummy_head.next = *head;
210   fd_node* node = &dummy_head;
211   while (node->next != nullptr) {
212     if (node->next->grpc_polled_fd->GetWrappedAresSocketLocked() == as) {
213       fd_node* ret = node->next;
214       node->next = node->next->next;
215       *head = dummy_head.next;
216       return ret;
217     }
218     node = node->next;
219   }
220   return nullptr;
221 }
222
223 static grpc_millis calculate_next_ares_backup_poll_alarm_ms(
224     grpc_ares_ev_driver* driver) {
225   // An alternative here could be to use ares_timeout to try to be more
226   // accurate, but that would require using "struct timeval"'s, which just makes
227   // things a bit more complicated. So just poll every second, as suggested
228   // by the c-ares code comments.
229   grpc_millis ms_until_next_ares_backup_poll_alarm = 1000;
230   GRPC_CARES_TRACE_LOG(
231       "request:%p ev_driver=%p. next ares process poll time in "
232       "%" PRId64 " ms",
233       driver->request, driver, ms_until_next_ares_backup_poll_alarm);
234   return ms_until_next_ares_backup_poll_alarm +
235          grpc_core::ExecCtx::Get()->Now();
236 }
237
238 static void on_timeout_locked(void* arg, grpc_error* error) {
239   grpc_ares_ev_driver* driver = static_cast<grpc_ares_ev_driver*>(arg);
240   GRPC_CARES_TRACE_LOG(
241       "request:%p ev_driver=%p on_timeout_locked. driver->shutting_down=%d. "
242       "err=%s",
243       driver->request, driver, driver->shutting_down, grpc_error_string(error));
244   if (!driver->shutting_down && error == GRPC_ERROR_NONE) {
245     grpc_ares_ev_driver_shutdown_locked(driver);
246   }
247   grpc_ares_ev_driver_unref(driver);
248 }
249
250 /* In case of non-responsive DNS servers, dropped packets, etc., c-ares has
251  * intelligent timeout and retry logic, which we can take advantage of by
252  * polling ares_process_fd on time intervals. Overall, the c-ares library is
253  * meant to be called into and given a chance to proceed name resolution:
254  *   a) when fd events happen
255  *   b) when some time has passed without fd events having happened
256  * For the latter, we use this backup poller. Also see
257  * https://github.com/grpc/grpc/pull/17688 description for more details. */
258 static void on_ares_backup_poll_alarm_locked(void* arg, grpc_error* error) {
259   grpc_ares_ev_driver* driver = static_cast<grpc_ares_ev_driver*>(arg);
260   GRPC_CARES_TRACE_LOG(
261       "request:%p ev_driver=%p on_ares_backup_poll_alarm_locked. "
262       "driver->shutting_down=%d. "
263       "err=%s",
264       driver->request, driver, driver->shutting_down, grpc_error_string(error));
265   if (!driver->shutting_down && error == GRPC_ERROR_NONE) {
266     fd_node* fdn = driver->fds;
267     while (fdn != nullptr) {
268       if (!fdn->already_shutdown) {
269         GRPC_CARES_TRACE_LOG(
270             "request:%p ev_driver=%p on_ares_backup_poll_alarm_locked; "
271             "ares_process_fd. fd=%s",
272             driver->request, driver, fdn->grpc_polled_fd->GetName());
273         ares_socket_t as = fdn->grpc_polled_fd->GetWrappedAresSocketLocked();
274         ares_process_fd(driver->channel, as, as);
275       }
276       fdn = fdn->next;
277     }
278     if (!driver->shutting_down) {
279       grpc_millis next_ares_backup_poll_alarm =
280           calculate_next_ares_backup_poll_alarm_ms(driver);
281       grpc_ares_ev_driver_ref(driver);
282       grpc_timer_init(&driver->ares_backup_poll_alarm,
283                       next_ares_backup_poll_alarm,
284                       &driver->on_ares_backup_poll_alarm_locked);
285     }
286     grpc_ares_notify_on_event_locked(driver);
287   }
288   grpc_ares_ev_driver_unref(driver);
289 }
290
291 static void on_readable_locked(void* arg, grpc_error* error) {
292   fd_node* fdn = static_cast<fd_node*>(arg);
293   GPR_ASSERT(fdn->readable_registered);
294   grpc_ares_ev_driver* ev_driver = fdn->ev_driver;
295   const ares_socket_t as = fdn->grpc_polled_fd->GetWrappedAresSocketLocked();
296   fdn->readable_registered = false;
297   GRPC_CARES_TRACE_LOG("request:%p readable on %s", fdn->ev_driver->request,
298                        fdn->grpc_polled_fd->GetName());
299   if (error == GRPC_ERROR_NONE) {
300     do {
301       ares_process_fd(ev_driver->channel, as, ARES_SOCKET_BAD);
302     } while (fdn->grpc_polled_fd->IsFdStillReadableLocked());
303   } else {
304     // If error is not GRPC_ERROR_NONE, it means the fd has been shutdown or
305     // timed out. The pending lookups made on this ev_driver will be cancelled
306     // by the following ares_cancel() and the on_done callbacks will be invoked
307     // with a status of ARES_ECANCELLED. The remaining file descriptors in this
308     // ev_driver will be cleaned up in the follwing
309     // grpc_ares_notify_on_event_locked().
310     ares_cancel(ev_driver->channel);
311   }
312   grpc_ares_notify_on_event_locked(ev_driver);
313   grpc_ares_ev_driver_unref(ev_driver);
314 }
315
316 static void on_writable_locked(void* arg, grpc_error* error) {
317   fd_node* fdn = static_cast<fd_node*>(arg);
318   GPR_ASSERT(fdn->writable_registered);
319   grpc_ares_ev_driver* ev_driver = fdn->ev_driver;
320   const ares_socket_t as = fdn->grpc_polled_fd->GetWrappedAresSocketLocked();
321   fdn->writable_registered = false;
322   GRPC_CARES_TRACE_LOG("request:%p writable on %s", ev_driver->request,
323                        fdn->grpc_polled_fd->GetName());
324   if (error == GRPC_ERROR_NONE) {
325     ares_process_fd(ev_driver->channel, ARES_SOCKET_BAD, as);
326   } else {
327     // If error is not GRPC_ERROR_NONE, it means the fd has been shutdown or
328     // timed out. The pending lookups made on this ev_driver will be cancelled
329     // by the following ares_cancel() and the on_done callbacks will be invoked
330     // with a status of ARES_ECANCELLED. The remaining file descriptors in this
331     // ev_driver will be cleaned up in the follwing
332     // grpc_ares_notify_on_event_locked().
333     ares_cancel(ev_driver->channel);
334   }
335   grpc_ares_notify_on_event_locked(ev_driver);
336   grpc_ares_ev_driver_unref(ev_driver);
337 }
338
339 ares_channel* grpc_ares_ev_driver_get_channel_locked(
340     grpc_ares_ev_driver* ev_driver) {
341   return &ev_driver->channel;
342 }
343
344 // Get the file descriptors used by the ev_driver's ares channel, register
345 // driver_closure with these filedescriptors.
346 static void grpc_ares_notify_on_event_locked(grpc_ares_ev_driver* ev_driver) {
347   fd_node* new_list = nullptr;
348   if (!ev_driver->shutting_down) {
349     ares_socket_t socks[ARES_GETSOCK_MAXNUM];
350     int socks_bitmask =
351         ares_getsock(ev_driver->channel, socks, ARES_GETSOCK_MAXNUM);
352     for (size_t i = 0; i < ARES_GETSOCK_MAXNUM; i++) {
353       if (ARES_GETSOCK_READABLE(socks_bitmask, i) ||
354           ARES_GETSOCK_WRITABLE(socks_bitmask, i)) {
355         fd_node* fdn = pop_fd_node_locked(&ev_driver->fds, socks[i]);
356         // Create a new fd_node if sock[i] is not in the fd_node list.
357         if (fdn == nullptr) {
358           fdn = static_cast<fd_node*>(gpr_malloc(sizeof(fd_node)));
359           fdn->grpc_polled_fd =
360               ev_driver->polled_fd_factory->NewGrpcPolledFdLocked(
361                   socks[i], ev_driver->pollset_set, ev_driver->combiner);
362           GRPC_CARES_TRACE_LOG("request:%p new fd: %s", ev_driver->request,
363                                fdn->grpc_polled_fd->GetName());
364           fdn->ev_driver = ev_driver;
365           fdn->readable_registered = false;
366           fdn->writable_registered = false;
367           fdn->already_shutdown = false;
368           GRPC_CLOSURE_INIT(&fdn->read_closure, on_readable_locked, fdn,
369                             grpc_combiner_scheduler(ev_driver->combiner));
370           GRPC_CLOSURE_INIT(&fdn->write_closure, on_writable_locked, fdn,
371                             grpc_combiner_scheduler(ev_driver->combiner));
372         }
373         fdn->next = new_list;
374         new_list = fdn;
375         // Register read_closure if the socket is readable and read_closure has
376         // not been registered with this socket.
377         if (ARES_GETSOCK_READABLE(socks_bitmask, i) &&
378             !fdn->readable_registered) {
379           grpc_ares_ev_driver_ref(ev_driver);
380           GRPC_CARES_TRACE_LOG("request:%p notify read on: %s",
381                                ev_driver->request,
382                                fdn->grpc_polled_fd->GetName());
383           fdn->grpc_polled_fd->RegisterForOnReadableLocked(&fdn->read_closure);
384           fdn->readable_registered = true;
385         }
386         // Register write_closure if the socket is writable and write_closure
387         // has not been registered with this socket.
388         if (ARES_GETSOCK_WRITABLE(socks_bitmask, i) &&
389             !fdn->writable_registered) {
390           GRPC_CARES_TRACE_LOG("request:%p notify write on: %s",
391                                ev_driver->request,
392                                fdn->grpc_polled_fd->GetName());
393           grpc_ares_ev_driver_ref(ev_driver);
394           fdn->grpc_polled_fd->RegisterForOnWriteableLocked(
395               &fdn->write_closure);
396           fdn->writable_registered = true;
397         }
398       }
399     }
400   }
401   // Any remaining fds in ev_driver->fds were not returned by ares_getsock() and
402   // are therefore no longer in use, so they can be shut down and removed from
403   // the list.
404   while (ev_driver->fds != nullptr) {
405     fd_node* cur = ev_driver->fds;
406     ev_driver->fds = ev_driver->fds->next;
407     fd_node_shutdown_locked(cur, "c-ares fd shutdown");
408     if (!cur->readable_registered && !cur->writable_registered) {
409       fd_node_destroy_locked(cur);
410     } else {
411       cur->next = new_list;
412       new_list = cur;
413     }
414   }
415   ev_driver->fds = new_list;
416   // If the ev driver has no working fd, all the tasks are done.
417   if (new_list == nullptr) {
418     ev_driver->working = false;
419     GRPC_CARES_TRACE_LOG("request:%p ev driver stop working",
420                          ev_driver->request);
421   }
422 }
423
424 void grpc_ares_ev_driver_start_locked(grpc_ares_ev_driver* ev_driver) {
425   if (!ev_driver->working) {
426     ev_driver->working = true;
427     grpc_ares_notify_on_event_locked(ev_driver);
428     // Initialize overall DNS resolution timeout alarm
429     grpc_millis timeout =
430         ev_driver->query_timeout_ms == 0
431             ? GRPC_MILLIS_INF_FUTURE
432             : ev_driver->query_timeout_ms + grpc_core::ExecCtx::Get()->Now();
433     GRPC_CARES_TRACE_LOG(
434         "request:%p ev_driver=%p grpc_ares_ev_driver_start_locked. timeout in "
435         "%" PRId64 " ms",
436         ev_driver->request, ev_driver, timeout);
437     grpc_ares_ev_driver_ref(ev_driver);
438     grpc_timer_init(&ev_driver->query_timeout, timeout,
439                     &ev_driver->on_timeout_locked);
440     // Initialize the backup poll alarm
441     grpc_millis next_ares_backup_poll_alarm =
442         calculate_next_ares_backup_poll_alarm_ms(ev_driver);
443     grpc_ares_ev_driver_ref(ev_driver);
444     grpc_timer_init(&ev_driver->ares_backup_poll_alarm,
445                     next_ares_backup_poll_alarm,
446                     &ev_driver->on_ares_backup_poll_alarm_locked);
447   }
448 }
449
450 #endif /* GRPC_ARES == 1 */