Built motion from commit 6a09e18b.|2.6.11
[motion2.git] / legacy-libs / grpc-cloned / deps / grpc / src / core / lib / iomgr / tcp_server_posix.cc
1 /*
2  *
3  * Copyright 2015 gRPC authors.
4  *
5  * Licensed under the Apache License, Version 2.0 (the "License");
6  * you may not use this file except in compliance with the License.
7  * You may obtain a copy of the License at
8  *
9  *     http://www.apache.org/licenses/LICENSE-2.0
10  *
11  * Unless required by applicable law or agreed to in writing, software
12  * distributed under the License is distributed on an "AS IS" BASIS,
13  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14  * See the License for the specific language governing permissions and
15  * limitations under the License.
16  *
17  */
18
19 /* FIXME: "posix" files shouldn't be depending on _GNU_SOURCE */
20 #ifndef _GNU_SOURCE
21 #define _GNU_SOURCE
22 #endif
23
24 #include <grpc/support/port_platform.h>
25
26 #include "src/core/lib/iomgr/port.h"
27
28 #ifdef GRPC_POSIX_SOCKET_TCP_SERVER
29
30 #include <errno.h>
31 #include <fcntl.h>
32 #include <netinet/in.h>
33 #include <netinet/tcp.h>
34 #include <string.h>
35 #include <sys/socket.h>
36 #include <sys/stat.h>
37 #include <sys/types.h>
38 #include <unistd.h>
39
40 #include <grpc/support/alloc.h>
41 #include <grpc/support/log.h>
42 #include <grpc/support/string_util.h>
43 #include <grpc/support/sync.h>
44 #include <grpc/support/time.h>
45
46 #include "src/core/lib/channel/channel_args.h"
47 #include "src/core/lib/gpr/string.h"
48 #include "src/core/lib/gprpp/memory.h"
49 #include "src/core/lib/iomgr/exec_ctx.h"
50 #include "src/core/lib/iomgr/resolve_address.h"
51 #include "src/core/lib/iomgr/sockaddr.h"
52 #include "src/core/lib/iomgr/sockaddr_utils.h"
53 #include "src/core/lib/iomgr/socket_utils_posix.h"
54 #include "src/core/lib/iomgr/tcp_posix.h"
55 #include "src/core/lib/iomgr/tcp_server.h"
56 #include "src/core/lib/iomgr/tcp_server_utils_posix.h"
57 #include "src/core/lib/iomgr/unix_sockets_posix.h"
58
59 static grpc_error* tcp_server_create(grpc_closure* shutdown_complete,
60                                      const grpc_channel_args* args,
61                                      grpc_tcp_server** server) {
62   grpc_tcp_server* s =
63       static_cast<grpc_tcp_server*>(gpr_zalloc(sizeof(grpc_tcp_server)));
64   s->so_reuseport = grpc_is_socket_reuse_port_supported();
65   s->expand_wildcard_addrs = false;
66   for (size_t i = 0; i < (args == nullptr ? 0 : args->num_args); i++) {
67     if (0 == strcmp(GRPC_ARG_ALLOW_REUSEPORT, args->args[i].key)) {
68       if (args->args[i].type == GRPC_ARG_INTEGER) {
69         s->so_reuseport = grpc_is_socket_reuse_port_supported() &&
70                           (args->args[i].value.integer != 0);
71       } else {
72         gpr_free(s);
73         return GRPC_ERROR_CREATE_FROM_STATIC_STRING(GRPC_ARG_ALLOW_REUSEPORT
74                                                     " must be an integer");
75       }
76     } else if (0 == strcmp(GRPC_ARG_EXPAND_WILDCARD_ADDRS, args->args[i].key)) {
77       if (args->args[i].type == GRPC_ARG_INTEGER) {
78         s->expand_wildcard_addrs = (args->args[i].value.integer != 0);
79       } else {
80         gpr_free(s);
81         return GRPC_ERROR_CREATE_FROM_STATIC_STRING(
82             GRPC_ARG_EXPAND_WILDCARD_ADDRS " must be an integer");
83       }
84     }
85   }
86   gpr_ref_init(&s->refs, 1);
87   gpr_mu_init(&s->mu);
88   s->active_ports = 0;
89   s->destroyed_ports = 0;
90   s->shutdown = false;
91   s->shutdown_starting.head = nullptr;
92   s->shutdown_starting.tail = nullptr;
93   s->shutdown_complete = shutdown_complete;
94   s->on_accept_cb = nullptr;
95   s->on_accept_cb_arg = nullptr;
96   s->head = nullptr;
97   s->tail = nullptr;
98   s->nports = 0;
99   s->channel_args = grpc_channel_args_copy(args);
100   s->fd_handler = nullptr;
101   gpr_atm_no_barrier_store(&s->next_pollset_to_assign, 0);
102   *server = s;
103   return GRPC_ERROR_NONE;
104 }
105
106 static void finish_shutdown(grpc_tcp_server* s) {
107   gpr_mu_lock(&s->mu);
108   GPR_ASSERT(s->shutdown);
109   gpr_mu_unlock(&s->mu);
110   if (s->shutdown_complete != nullptr) {
111     GRPC_CLOSURE_SCHED(s->shutdown_complete, GRPC_ERROR_NONE);
112   }
113
114   gpr_mu_destroy(&s->mu);
115
116   while (s->head) {
117     grpc_tcp_listener* sp = s->head;
118     s->head = sp->next;
119     gpr_free(sp);
120   }
121   grpc_channel_args_destroy(s->channel_args);
122   grpc_core::Delete(s->fd_handler);
123
124   gpr_free(s);
125 }
126
127 static void destroyed_port(void* server, grpc_error* error) {
128   grpc_tcp_server* s = static_cast<grpc_tcp_server*>(server);
129   gpr_mu_lock(&s->mu);
130   s->destroyed_ports++;
131   if (s->destroyed_ports == s->nports) {
132     gpr_mu_unlock(&s->mu);
133     finish_shutdown(s);
134   } else {
135     GPR_ASSERT(s->destroyed_ports < s->nports);
136     gpr_mu_unlock(&s->mu);
137   }
138 }
139
140 /* called when all listening endpoints have been shutdown, so no further
141    events will be received on them - at this point it's safe to destroy
142    things */
143 static void deactivated_all_ports(grpc_tcp_server* s) {
144   /* delete ALL the things */
145   gpr_mu_lock(&s->mu);
146
147   GPR_ASSERT(s->shutdown);
148
149   if (s->head) {
150     grpc_tcp_listener* sp;
151     for (sp = s->head; sp; sp = sp->next) {
152       grpc_unlink_if_unix_domain_socket(&sp->addr);
153       GRPC_CLOSURE_INIT(&sp->destroyed_closure, destroyed_port, s,
154                         grpc_schedule_on_exec_ctx);
155       grpc_fd_orphan(sp->emfd, &sp->destroyed_closure, nullptr,
156                      "tcp_listener_shutdown");
157     }
158     gpr_mu_unlock(&s->mu);
159   } else {
160     gpr_mu_unlock(&s->mu);
161     finish_shutdown(s);
162   }
163 }
164
165 static void tcp_server_destroy(grpc_tcp_server* s) {
166   gpr_mu_lock(&s->mu);
167
168   GPR_ASSERT(!s->shutdown);
169   s->shutdown = true;
170
171   /* shutdown all fd's */
172   if (s->active_ports) {
173     grpc_tcp_listener* sp;
174     for (sp = s->head; sp; sp = sp->next) {
175       grpc_fd_shutdown(
176           sp->emfd, GRPC_ERROR_CREATE_FROM_STATIC_STRING("Server destroyed"));
177     }
178     gpr_mu_unlock(&s->mu);
179   } else {
180     gpr_mu_unlock(&s->mu);
181     deactivated_all_ports(s);
182   }
183 }
184
185 /* event manager callback when reads are ready */
186 static void on_read(void* arg, grpc_error* err) {
187   grpc_tcp_listener* sp = static_cast<grpc_tcp_listener*>(arg);
188   grpc_pollset* read_notifier_pollset;
189   if (err != GRPC_ERROR_NONE) {
190     goto error;
191   }
192
193   /* loop until accept4 returns EAGAIN, and then re-arm notification */
194   for (;;) {
195     grpc_resolved_address addr;
196     char* addr_str;
197     char* name;
198     memset(&addr, 0, sizeof(addr));
199     addr.len = static_cast<socklen_t>(sizeof(struct sockaddr_storage));
200     /* Note: If we ever decide to return this address to the user, remember to
201        strip off the ::ffff:0.0.0.0/96 prefix first. */
202     int fd = grpc_accept4(sp->fd, &addr, 1, 1);
203     if (fd < 0) {
204       switch (errno) {
205         case EINTR:
206           continue;
207         case EAGAIN:
208           grpc_fd_notify_on_read(sp->emfd, &sp->read_closure);
209           return;
210         default:
211           gpr_mu_lock(&sp->server->mu);
212           if (!sp->server->shutdown_listeners) {
213             gpr_log(GPR_ERROR, "Failed accept4: %s", strerror(errno));
214           } else {
215             /* if we have shutdown listeners, accept4 could fail, and we
216                needn't notify users */
217           }
218           gpr_mu_unlock(&sp->server->mu);
219           goto error;
220       }
221     }
222
223     /* For UNIX sockets, the accept call might not fill up the member sun_path
224      * of sockaddr_un, so explicitly call getsockname to get it. */
225     if (grpc_is_unix_socket(&addr)) {
226       memset(&addr, 0, sizeof(addr));
227       addr.len = static_cast<socklen_t>(sizeof(struct sockaddr_storage));
228       if (getsockname(fd, reinterpret_cast<struct sockaddr*>(addr.addr),
229                       &(addr.len)) < 0) {
230         gpr_log(GPR_ERROR, "Failed getsockname: %s", strerror(errno));
231         close(fd);
232         goto error;
233       }
234     }
235
236     grpc_set_socket_no_sigpipe_if_possible(fd);
237
238     addr_str = grpc_sockaddr_to_uri(&addr);
239     gpr_asprintf(&name, "tcp-server-connection:%s", addr_str);
240
241     if (GRPC_TRACE_FLAG_ENABLED(grpc_tcp_trace)) {
242       gpr_log(GPR_INFO, "SERVER_CONNECT: incoming connection: %s", addr_str);
243     }
244
245     grpc_fd* fdobj = grpc_fd_create(fd, name, true);
246
247     read_notifier_pollset =
248         sp->server->pollsets[static_cast<size_t>(gpr_atm_no_barrier_fetch_add(
249                                  &sp->server->next_pollset_to_assign, 1)) %
250                              sp->server->pollset_count];
251
252     grpc_pollset_add_fd(read_notifier_pollset, fdobj);
253
254     // Create acceptor.
255     grpc_tcp_server_acceptor* acceptor =
256         static_cast<grpc_tcp_server_acceptor*>(gpr_malloc(sizeof(*acceptor)));
257     acceptor->from_server = sp->server;
258     acceptor->port_index = sp->port_index;
259     acceptor->fd_index = sp->fd_index;
260     acceptor->external_connection = false;
261
262     sp->server->on_accept_cb(
263         sp->server->on_accept_cb_arg,
264         grpc_tcp_create(fdobj, sp->server->channel_args, addr_str),
265         read_notifier_pollset, acceptor);
266
267     gpr_free(name);
268     gpr_free(addr_str);
269   }
270
271   GPR_UNREACHABLE_CODE(return );
272
273 error:
274   gpr_mu_lock(&sp->server->mu);
275   if (0 == --sp->server->active_ports && sp->server->shutdown) {
276     gpr_mu_unlock(&sp->server->mu);
277     deactivated_all_ports(sp->server);
278   } else {
279     gpr_mu_unlock(&sp->server->mu);
280   }
281 }
282
283 /* Treat :: or 0.0.0.0 as a family-agnostic wildcard. */
284 static grpc_error* add_wildcard_addrs_to_server(grpc_tcp_server* s,
285                                                 unsigned port_index,
286                                                 int requested_port,
287                                                 int* out_port) {
288   grpc_resolved_address wild4;
289   grpc_resolved_address wild6;
290   unsigned fd_index = 0;
291   grpc_dualstack_mode dsmode;
292   grpc_tcp_listener* sp = nullptr;
293   grpc_tcp_listener* sp2 = nullptr;
294   grpc_error* v6_err = GRPC_ERROR_NONE;
295   grpc_error* v4_err = GRPC_ERROR_NONE;
296   *out_port = -1;
297
298   if (grpc_tcp_server_have_ifaddrs() && s->expand_wildcard_addrs) {
299     return grpc_tcp_server_add_all_local_addrs(s, port_index, requested_port,
300                                                out_port);
301   }
302
303   grpc_sockaddr_make_wildcards(requested_port, &wild4, &wild6);
304   /* Try listening on IPv6 first. */
305   if ((v6_err = grpc_tcp_server_add_addr(s, &wild6, port_index, fd_index,
306                                          &dsmode, &sp)) == GRPC_ERROR_NONE) {
307     ++fd_index;
308     requested_port = *out_port = sp->port;
309     if (dsmode == GRPC_DSMODE_DUALSTACK || dsmode == GRPC_DSMODE_IPV4) {
310       return GRPC_ERROR_NONE;
311     }
312   }
313   /* If we got a v6-only socket or nothing, try adding 0.0.0.0. */
314   grpc_sockaddr_set_port(&wild4, requested_port);
315   if ((v4_err = grpc_tcp_server_add_addr(s, &wild4, port_index, fd_index,
316                                          &dsmode, &sp2)) == GRPC_ERROR_NONE) {
317     *out_port = sp2->port;
318     if (sp != nullptr) {
319       sp2->is_sibling = 1;
320       sp->sibling = sp2;
321     }
322   }
323   if (*out_port > 0) {
324     if (v6_err != GRPC_ERROR_NONE) {
325       gpr_log(GPR_INFO,
326               "Failed to add :: listener, "
327               "the environment may not support IPv6: %s",
328               grpc_error_string(v6_err));
329       GRPC_ERROR_UNREF(v6_err);
330     }
331     if (v4_err != GRPC_ERROR_NONE) {
332       gpr_log(GPR_INFO,
333               "Failed to add 0.0.0.0 listener, "
334               "the environment may not support IPv4: %s",
335               grpc_error_string(v4_err));
336       GRPC_ERROR_UNREF(v4_err);
337     }
338     return GRPC_ERROR_NONE;
339   } else {
340     grpc_error* root_err = GRPC_ERROR_CREATE_FROM_STATIC_STRING(
341         "Failed to add any wildcard listeners");
342     GPR_ASSERT(v6_err != GRPC_ERROR_NONE && v4_err != GRPC_ERROR_NONE);
343     root_err = grpc_error_add_child(root_err, v6_err);
344     root_err = grpc_error_add_child(root_err, v4_err);
345     return root_err;
346   }
347 }
348
349 static grpc_error* clone_port(grpc_tcp_listener* listener, unsigned count) {
350   grpc_tcp_listener* sp = nullptr;
351   char* addr_str;
352   char* name;
353   grpc_error* err;
354
355   for (grpc_tcp_listener* l = listener->next; l && l->is_sibling; l = l->next) {
356     l->fd_index += count;
357   }
358
359   for (unsigned i = 0; i < count; i++) {
360     int fd = -1;
361     int port = -1;
362     grpc_dualstack_mode dsmode;
363     err = grpc_create_dualstack_socket(&listener->addr, SOCK_STREAM, 0, &dsmode,
364                                        &fd);
365     if (err != GRPC_ERROR_NONE) return err;
366     err = grpc_tcp_server_prepare_socket(listener->server, fd, &listener->addr,
367                                          true, &port);
368     if (err != GRPC_ERROR_NONE) return err;
369     listener->server->nports++;
370     grpc_sockaddr_to_string(&addr_str, &listener->addr, 1);
371     gpr_asprintf(&name, "tcp-server-listener:%s/clone-%d", addr_str, i);
372     sp = static_cast<grpc_tcp_listener*>(gpr_malloc(sizeof(grpc_tcp_listener)));
373     sp->next = listener->next;
374     listener->next = sp;
375     /* sp (the new listener) is a sibling of 'listener' (the original
376        listener). */
377     sp->is_sibling = 1;
378     sp->sibling = listener->sibling;
379     listener->sibling = sp;
380     sp->server = listener->server;
381     sp->fd = fd;
382     sp->emfd = grpc_fd_create(fd, name, true);
383     memcpy(&sp->addr, &listener->addr, sizeof(grpc_resolved_address));
384     sp->port = port;
385     sp->port_index = listener->port_index;
386     sp->fd_index = listener->fd_index + count - i;
387     GPR_ASSERT(sp->emfd);
388     while (listener->server->tail->next != nullptr) {
389       listener->server->tail = listener->server->tail->next;
390     }
391     gpr_free(addr_str);
392     gpr_free(name);
393   }
394
395   return GRPC_ERROR_NONE;
396 }
397
398 static grpc_error* tcp_server_add_port(grpc_tcp_server* s,
399                                        const grpc_resolved_address* addr,
400                                        int* out_port) {
401   grpc_tcp_listener* sp;
402   grpc_resolved_address sockname_temp;
403   grpc_resolved_address addr6_v4mapped;
404   int requested_port = grpc_sockaddr_get_port(addr);
405   unsigned port_index = 0;
406   grpc_dualstack_mode dsmode;
407   grpc_error* err;
408   *out_port = -1;
409   if (s->tail != nullptr) {
410     port_index = s->tail->port_index + 1;
411   }
412   grpc_unlink_if_unix_domain_socket(addr);
413
414   /* Check if this is a wildcard port, and if so, try to keep the port the same
415      as some previously created listener. */
416   if (requested_port == 0) {
417     for (sp = s->head; sp; sp = sp->next) {
418       sockname_temp.len =
419           static_cast<socklen_t>(sizeof(struct sockaddr_storage));
420       if (0 ==
421           getsockname(sp->fd,
422                       reinterpret_cast<grpc_sockaddr*>(&sockname_temp.addr),
423                       &sockname_temp.len)) {
424         int used_port = grpc_sockaddr_get_port(&sockname_temp);
425         if (used_port > 0) {
426           memcpy(&sockname_temp, addr, sizeof(grpc_resolved_address));
427           grpc_sockaddr_set_port(&sockname_temp, used_port);
428           requested_port = used_port;
429           addr = &sockname_temp;
430           break;
431         }
432       }
433     }
434   }
435   if (grpc_sockaddr_is_wildcard(addr, &requested_port)) {
436     return add_wildcard_addrs_to_server(s, port_index, requested_port,
437                                         out_port);
438   }
439   if (grpc_sockaddr_to_v4mapped(addr, &addr6_v4mapped)) {
440     addr = &addr6_v4mapped;
441   }
442   if ((err = grpc_tcp_server_add_addr(s, addr, port_index, 0, &dsmode, &sp)) ==
443       GRPC_ERROR_NONE) {
444     *out_port = sp->port;
445   }
446   return err;
447 }
448
449 /* Return listener at port_index or NULL. Should only be called with s->mu
450    locked. */
451 static grpc_tcp_listener* get_port_index(grpc_tcp_server* s,
452                                          unsigned port_index) {
453   unsigned num_ports = 0;
454   grpc_tcp_listener* sp;
455   for (sp = s->head; sp; sp = sp->next) {
456     if (!sp->is_sibling) {
457       if (++num_ports > port_index) {
458         return sp;
459       }
460     }
461   }
462   return nullptr;
463 }
464
465 unsigned tcp_server_port_fd_count(grpc_tcp_server* s, unsigned port_index) {
466   unsigned num_fds = 0;
467   gpr_mu_lock(&s->mu);
468   grpc_tcp_listener* sp = get_port_index(s, port_index);
469   for (; sp; sp = sp->sibling) {
470     ++num_fds;
471   }
472   gpr_mu_unlock(&s->mu);
473   return num_fds;
474 }
475
476 static int tcp_server_port_fd(grpc_tcp_server* s, unsigned port_index,
477                               unsigned fd_index) {
478   gpr_mu_lock(&s->mu);
479   grpc_tcp_listener* sp = get_port_index(s, port_index);
480   for (; sp; sp = sp->sibling, --fd_index) {
481     if (fd_index == 0) {
482       gpr_mu_unlock(&s->mu);
483       return sp->fd;
484     }
485   }
486   gpr_mu_unlock(&s->mu);
487   return -1;
488 }
489
490 static void tcp_server_start(grpc_tcp_server* s, grpc_pollset** pollsets,
491                              size_t pollset_count,
492                              grpc_tcp_server_cb on_accept_cb,
493                              void* on_accept_cb_arg) {
494   size_t i;
495   grpc_tcp_listener* sp;
496   GPR_ASSERT(on_accept_cb);
497   gpr_mu_lock(&s->mu);
498   GPR_ASSERT(!s->on_accept_cb);
499   GPR_ASSERT(s->active_ports == 0);
500   s->on_accept_cb = on_accept_cb;
501   s->on_accept_cb_arg = on_accept_cb_arg;
502   s->pollsets = pollsets;
503   s->pollset_count = pollset_count;
504   sp = s->head;
505   while (sp != nullptr) {
506     if (s->so_reuseport && !grpc_is_unix_socket(&sp->addr) &&
507         pollset_count > 1) {
508       GPR_ASSERT(GRPC_LOG_IF_ERROR(
509           "clone_port", clone_port(sp, (unsigned)(pollset_count - 1))));
510       for (i = 0; i < pollset_count; i++) {
511         grpc_pollset_add_fd(pollsets[i], sp->emfd);
512         GRPC_CLOSURE_INIT(&sp->read_closure, on_read, sp,
513                           grpc_schedule_on_exec_ctx);
514         grpc_fd_notify_on_read(sp->emfd, &sp->read_closure);
515         s->active_ports++;
516         sp = sp->next;
517       }
518     } else {
519       for (i = 0; i < pollset_count; i++) {
520         grpc_pollset_add_fd(pollsets[i], sp->emfd);
521       }
522       GRPC_CLOSURE_INIT(&sp->read_closure, on_read, sp,
523                         grpc_schedule_on_exec_ctx);
524       grpc_fd_notify_on_read(sp->emfd, &sp->read_closure);
525       s->active_ports++;
526       sp = sp->next;
527     }
528   }
529   gpr_mu_unlock(&s->mu);
530 }
531
532 grpc_tcp_server* tcp_server_ref(grpc_tcp_server* s) {
533   gpr_ref_non_zero(&s->refs);
534   return s;
535 }
536
537 static void tcp_server_shutdown_starting_add(grpc_tcp_server* s,
538                                              grpc_closure* shutdown_starting) {
539   gpr_mu_lock(&s->mu);
540   grpc_closure_list_append(&s->shutdown_starting, shutdown_starting,
541                            GRPC_ERROR_NONE);
542   gpr_mu_unlock(&s->mu);
543 }
544
545 static void tcp_server_unref(grpc_tcp_server* s) {
546   if (gpr_unref(&s->refs)) {
547     grpc_tcp_server_shutdown_listeners(s);
548     gpr_mu_lock(&s->mu);
549     GRPC_CLOSURE_LIST_SCHED(&s->shutdown_starting);
550     gpr_mu_unlock(&s->mu);
551     tcp_server_destroy(s);
552   }
553 }
554
555 static void tcp_server_shutdown_listeners(grpc_tcp_server* s) {
556   gpr_mu_lock(&s->mu);
557   s->shutdown_listeners = true;
558   /* shutdown all fd's */
559   if (s->active_ports) {
560     grpc_tcp_listener* sp;
561     for (sp = s->head; sp; sp = sp->next) {
562       grpc_fd_shutdown(sp->emfd,
563                        GRPC_ERROR_CREATE_FROM_STATIC_STRING("Server shutdown"));
564     }
565   }
566   gpr_mu_unlock(&s->mu);
567 }
568
569 namespace {
570 class ExternalConnectionHandler : public grpc_core::TcpServerFdHandler {
571  public:
572   explicit ExternalConnectionHandler(grpc_tcp_server* s) : s_(s) {}
573
574   // TODO(yangg) resolve duplicate code with on_read
575   void Handle(int listener_fd, int fd, grpc_byte_buffer* buf) override {
576     grpc_pollset* read_notifier_pollset;
577     grpc_resolved_address addr;
578     char* addr_str;
579     char* name;
580     memset(&addr, 0, sizeof(addr));
581     addr.len = static_cast<socklen_t>(sizeof(struct sockaddr_storage));
582     grpc_core::ExecCtx exec_ctx;
583
584     if (getpeername(fd, reinterpret_cast<struct sockaddr*>(addr.addr),
585                     &(addr.len)) < 0) {
586       gpr_log(GPR_ERROR, "Failed getpeername: %s", strerror(errno));
587       close(fd);
588       return;
589     }
590     grpc_set_socket_no_sigpipe_if_possible(fd);
591     addr_str = grpc_sockaddr_to_uri(&addr);
592     gpr_asprintf(&name, "tcp-server-connection:%s", addr_str);
593     if (grpc_tcp_trace.enabled()) {
594       gpr_log(GPR_INFO, "SERVER_CONNECT: incoming external connection: %s",
595               addr_str);
596     }
597     grpc_fd* fdobj = grpc_fd_create(fd, name, true);
598     read_notifier_pollset =
599         s_->pollsets[static_cast<size_t>(gpr_atm_no_barrier_fetch_add(
600                          &s_->next_pollset_to_assign, 1)) %
601                      s_->pollset_count];
602     grpc_pollset_add_fd(read_notifier_pollset, fdobj);
603     grpc_tcp_server_acceptor* acceptor =
604         static_cast<grpc_tcp_server_acceptor*>(gpr_malloc(sizeof(*acceptor)));
605     acceptor->from_server = s_;
606     acceptor->port_index = -1;
607     acceptor->fd_index = -1;
608     acceptor->external_connection = true;
609     acceptor->listener_fd = listener_fd;
610     acceptor->pending_data = buf;
611     s_->on_accept_cb(s_->on_accept_cb_arg,
612                      grpc_tcp_create(fdobj, s_->channel_args, addr_str),
613                      read_notifier_pollset, acceptor);
614     gpr_free(name);
615     gpr_free(addr_str);
616   }
617
618  private:
619   grpc_tcp_server* s_;
620 };
621 }  // namespace
622
623 static grpc_core::TcpServerFdHandler* tcp_server_create_fd_handler(
624     grpc_tcp_server* s) {
625   s->fd_handler = grpc_core::New<ExternalConnectionHandler>(s);
626   return s->fd_handler;
627 }
628
629 grpc_tcp_server_vtable grpc_posix_tcp_server_vtable = {
630     tcp_server_create,        tcp_server_start,
631     tcp_server_add_port,      tcp_server_create_fd_handler,
632     tcp_server_port_fd_count, tcp_server_port_fd,
633     tcp_server_ref,           tcp_server_shutdown_starting_add,
634     tcp_server_unref,         tcp_server_shutdown_listeners};
635
636 #endif /* GRPC_POSIX_SOCKET_TCP_SERVER */