Built motion from commit 6a09e18b.|2.6.11
[motion2.git] / legacy-libs / grpc / deps / grpc / src / core / lib / iomgr / tcp_server_windows.cc
1 /*
2  *
3  * Copyright 2015 gRPC authors.
4  *
5  * Licensed under the Apache License, Version 2.0 (the "License");
6  * you may not use this file except in compliance with the License.
7  * You may obtain a copy of the License at
8  *
9  *     http://www.apache.org/licenses/LICENSE-2.0
10  *
11  * Unless required by applicable law or agreed to in writing, software
12  * distributed under the License is distributed on an "AS IS" BASIS,
13  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14  * See the License for the specific language governing permissions and
15  * limitations under the License.
16  *
17  */
18
19 #include <grpc/support/port_platform.h>
20
21 #include "src/core/lib/iomgr/port.h"
22
23 #ifdef GRPC_WINSOCK_SOCKET
24
25 #include "src/core/lib/iomgr/sockaddr.h"
26
27 #include <inttypes.h>
28 #include <io.h>
29
30 #include <grpc/support/alloc.h>
31 #include <grpc/support/log.h>
32 #include <grpc/support/log_windows.h>
33 #include <grpc/support/string_util.h>
34 #include <grpc/support/sync.h>
35 #include <grpc/support/time.h>
36
37 #include "src/core/lib/channel/channel_args.h"
38 #include "src/core/lib/iomgr/iocp_windows.h"
39 #include "src/core/lib/iomgr/pollset_windows.h"
40 #include "src/core/lib/iomgr/resolve_address.h"
41 #include "src/core/lib/iomgr/sockaddr_utils.h"
42 #include "src/core/lib/iomgr/socket_windows.h"
43 #include "src/core/lib/iomgr/tcp_server.h"
44 #include "src/core/lib/iomgr/tcp_windows.h"
45
46 #define MIN_SAFE_ACCEPT_QUEUE_SIZE 100
47
48 /* one listening port */
49 typedef struct grpc_tcp_listener grpc_tcp_listener;
50 struct grpc_tcp_listener {
51   /* This seemingly magic number comes from AcceptEx's documentation. each
52      address buffer needs to have at least 16 more bytes at their end. */
53   uint8_t addresses[(sizeof(grpc_sockaddr_in6) + 16) * 2];
54   /* This will hold the socket for the next accept. */
55   SOCKET new_socket;
56   /* The listener winsocket. */
57   grpc_winsocket* socket;
58   /* The actual TCP port number. */
59   int port;
60   unsigned port_index;
61   grpc_tcp_server* server;
62   /* The cached AcceptEx for that port. */
63   LPFN_ACCEPTEX AcceptEx;
64   int shutting_down;
65   int outstanding_calls;
66   /* closure for socket notification of accept being ready */
67   grpc_closure on_accept;
68   /* linked list */
69   struct grpc_tcp_listener* next;
70 };
71
72 /* the overall server */
73 struct grpc_tcp_server {
74   gpr_refcount refs;
75   /* Called whenever accept() succeeds on a server port. */
76   grpc_tcp_server_cb on_accept_cb;
77   void* on_accept_cb_arg;
78
79   gpr_mu mu;
80
81   /* active port count: how many ports are actually still listening */
82   int active_ports;
83
84   /* linked list of server ports */
85   grpc_tcp_listener* head;
86   grpc_tcp_listener* tail;
87
88   /* List of closures passed to shutdown_starting_add(). */
89   grpc_closure_list shutdown_starting;
90
91   /* shutdown callback */
92   grpc_closure* shutdown_complete;
93
94   grpc_channel_args* channel_args;
95 };
96
97 /* Public function. Allocates the proper data structures to hold a
98    grpc_tcp_server. */
99 static grpc_error* tcp_server_create(grpc_closure* shutdown_complete,
100                                      const grpc_channel_args* args,
101                                      grpc_tcp_server** server) {
102   grpc_tcp_server* s = (grpc_tcp_server*)gpr_malloc(sizeof(grpc_tcp_server));
103   s->channel_args = grpc_channel_args_copy(args);
104   gpr_ref_init(&s->refs, 1);
105   gpr_mu_init(&s->mu);
106   s->active_ports = 0;
107   s->on_accept_cb = NULL;
108   s->on_accept_cb_arg = NULL;
109   s->head = NULL;
110   s->tail = NULL;
111   s->shutdown_starting.head = NULL;
112   s->shutdown_starting.tail = NULL;
113   s->shutdown_complete = shutdown_complete;
114   *server = s;
115   return GRPC_ERROR_NONE;
116 }
117
118 static void destroy_server(void* arg, grpc_error* error) {
119   grpc_tcp_server* s = (grpc_tcp_server*)arg;
120
121   /* Now that the accepts have been aborted, we can destroy the sockets.
122      The IOCP won't get notified on these, so we can flag them as already
123      closed by the system. */
124   while (s->head) {
125     grpc_tcp_listener* sp = s->head;
126     s->head = sp->next;
127     sp->next = NULL;
128     grpc_winsocket_destroy(sp->socket);
129     gpr_free(sp);
130   }
131   grpc_channel_args_destroy(s->channel_args);
132   gpr_mu_destroy(&s->mu);
133   gpr_free(s);
134 }
135
136 static void finish_shutdown_locked(grpc_tcp_server* s) {
137   if (s->shutdown_complete != NULL) {
138     GRPC_CLOSURE_SCHED(s->shutdown_complete, GRPC_ERROR_NONE);
139   }
140
141   GRPC_CLOSURE_SCHED(
142       GRPC_CLOSURE_CREATE(destroy_server, s, grpc_schedule_on_exec_ctx),
143       GRPC_ERROR_NONE);
144 }
145
146 static grpc_tcp_server* tcp_server_ref(grpc_tcp_server* s) {
147   gpr_ref_non_zero(&s->refs);
148   return s;
149 }
150
151 static void tcp_server_shutdown_starting_add(grpc_tcp_server* s,
152                                              grpc_closure* shutdown_starting) {
153   gpr_mu_lock(&s->mu);
154   grpc_closure_list_append(&s->shutdown_starting, shutdown_starting,
155                            GRPC_ERROR_NONE);
156   gpr_mu_unlock(&s->mu);
157 }
158
159 static void tcp_server_destroy(grpc_tcp_server* s) {
160   grpc_tcp_listener* sp;
161   gpr_mu_lock(&s->mu);
162
163   /* First, shutdown all fd's. This will queue abortion calls for all
164      of the pending accepts due to the normal operation mechanism. */
165   if (s->active_ports == 0) {
166     finish_shutdown_locked(s);
167   } else {
168     for (sp = s->head; sp; sp = sp->next) {
169       sp->shutting_down = 1;
170       grpc_winsocket_shutdown(sp->socket);
171     }
172   }
173   gpr_mu_unlock(&s->mu);
174 }
175
176 static void tcp_server_unref(grpc_tcp_server* s) {
177   if (gpr_unref(&s->refs)) {
178     grpc_tcp_server_shutdown_listeners(s);
179     gpr_mu_lock(&s->mu);
180     GRPC_CLOSURE_LIST_SCHED(&s->shutdown_starting);
181     gpr_mu_unlock(&s->mu);
182     tcp_server_destroy(s);
183   }
184 }
185
186 /* Prepare (bind) a recently-created socket for listening. */
187 static grpc_error* prepare_socket(SOCKET sock,
188                                   const grpc_resolved_address* addr,
189                                   int* port) {
190   grpc_resolved_address sockname_temp;
191   grpc_error* error = GRPC_ERROR_NONE;
192   int sockname_temp_len;
193
194   error = grpc_tcp_prepare_socket(sock);
195   if (error != GRPC_ERROR_NONE) {
196     goto failure;
197   }
198
199   if (bind(sock, (const grpc_sockaddr*)addr->addr, (int)addr->len) ==
200       SOCKET_ERROR) {
201     error = GRPC_WSA_ERROR(WSAGetLastError(), "bind");
202     goto failure;
203   }
204
205   if (listen(sock, SOMAXCONN) == SOCKET_ERROR) {
206     error = GRPC_WSA_ERROR(WSAGetLastError(), "listen");
207     goto failure;
208   }
209
210   sockname_temp_len = sizeof(struct sockaddr_storage);
211   if (getsockname(sock, (grpc_sockaddr*)sockname_temp.addr,
212                   &sockname_temp_len) == SOCKET_ERROR) {
213     error = GRPC_WSA_ERROR(WSAGetLastError(), "getsockname");
214     goto failure;
215   }
216   sockname_temp.len = (size_t)sockname_temp_len;
217
218   *port = grpc_sockaddr_get_port(&sockname_temp);
219   return GRPC_ERROR_NONE;
220
221 failure:
222   GPR_ASSERT(error != GRPC_ERROR_NONE);
223   char* tgtaddr = grpc_sockaddr_to_uri(addr);
224   grpc_error_set_int(
225       grpc_error_set_str(GRPC_ERROR_CREATE_REFERENCING_FROM_STATIC_STRING(
226                              "Failed to prepare server socket", &error, 1),
227                          GRPC_ERROR_STR_TARGET_ADDRESS,
228                          grpc_slice_from_copied_string(tgtaddr)),
229       GRPC_ERROR_INT_FD, (intptr_t)sock);
230   gpr_free(tgtaddr);
231   GRPC_ERROR_UNREF(error);
232   if (sock != INVALID_SOCKET) closesocket(sock);
233   return error;
234 }
235
236 static void decrement_active_ports_and_notify_locked(grpc_tcp_listener* sp) {
237   sp->shutting_down = 0;
238   GPR_ASSERT(sp->server->active_ports > 0);
239   if (0 == --sp->server->active_ports) {
240     finish_shutdown_locked(sp->server);
241   }
242 }
243
244 /* In order to do an async accept, we need to create a socket first which
245    will be the one assigned to the new incoming connection. */
246 static grpc_error* start_accept_locked(grpc_tcp_listener* port) {
247   SOCKET sock = INVALID_SOCKET;
248   BOOL success;
249   DWORD addrlen = sizeof(grpc_sockaddr_in6) + 16;
250   DWORD bytes_received = 0;
251   grpc_error* error = GRPC_ERROR_NONE;
252
253   if (port->shutting_down) {
254     return GRPC_ERROR_NONE;
255   }
256
257   sock = WSASocket(AF_INET6, SOCK_STREAM, IPPROTO_TCP, NULL, 0,
258                    grpc_get_default_wsa_socket_flags());
259   if (sock == INVALID_SOCKET) {
260     error = GRPC_WSA_ERROR(WSAGetLastError(), "WSASocket");
261     goto failure;
262   }
263
264   error = grpc_tcp_prepare_socket(sock);
265   if (error != GRPC_ERROR_NONE) goto failure;
266
267   /* Start the "accept" asynchronously. */
268   success = port->AcceptEx(port->socket->socket, sock, port->addresses, 0,
269                            addrlen, addrlen, &bytes_received,
270                            &port->socket->read_info.overlapped);
271
272   /* It is possible to get an accept immediately without delay. However, we
273      will still get an IOCP notification for it. So let's just ignore it. */
274   if (!success) {
275     int last_error = WSAGetLastError();
276     if (last_error != ERROR_IO_PENDING) {
277       error = GRPC_WSA_ERROR(last_error, "AcceptEx");
278       goto failure;
279     }
280   }
281
282   /* We're ready to do the accept. Calling grpc_socket_notify_on_read may
283      immediately process an accept that happened in the meantime. */
284   port->new_socket = sock;
285   grpc_socket_notify_on_read(port->socket, &port->on_accept);
286   port->outstanding_calls++;
287   return error;
288
289 failure:
290   GPR_ASSERT(error != GRPC_ERROR_NONE);
291   if (sock != INVALID_SOCKET) closesocket(sock);
292   return error;
293 }
294
295 /* Event manager callback when reads are ready. */
296 static void on_accept(void* arg, grpc_error* error) {
297   grpc_tcp_listener* sp = (grpc_tcp_listener*)arg;
298   SOCKET sock = sp->new_socket;
299   grpc_winsocket_callback_info* info = &sp->socket->read_info;
300   grpc_endpoint* ep = NULL;
301   grpc_resolved_address peer_name;
302   char* peer_name_string;
303   char* fd_name;
304   DWORD transfered_bytes;
305   DWORD flags;
306   BOOL wsa_success;
307   int err;
308
309   gpr_mu_lock(&sp->server->mu);
310
311   peer_name.len = sizeof(struct sockaddr_storage);
312
313   /* The general mechanism for shutting down is to queue abortion calls. While
314      this is necessary in the read/write case, it's useless for the accept
315      case. We only need to adjust the pending callback count */
316   if (error != GRPC_ERROR_NONE) {
317     const char* msg = grpc_error_string(error);
318     gpr_log(GPR_INFO, "Skipping on_accept due to error: %s", msg);
319
320     gpr_mu_unlock(&sp->server->mu);
321     return;
322   }
323
324   /* The IOCP notified us of a completed operation. Let's grab the results,
325      and act accordingly. */
326   transfered_bytes = 0;
327   wsa_success = WSAGetOverlappedResult(sock, &info->overlapped,
328                                        &transfered_bytes, FALSE, &flags);
329   if (!wsa_success) {
330     if (!sp->shutting_down) {
331       char* utf8_message = gpr_format_message(WSAGetLastError());
332       gpr_log(GPR_ERROR, "on_accept error: %s", utf8_message);
333       gpr_free(utf8_message);
334     }
335     closesocket(sock);
336   } else {
337     if (!sp->shutting_down) {
338       peer_name_string = NULL;
339       err = setsockopt(sock, SOL_SOCKET, SO_UPDATE_ACCEPT_CONTEXT,
340                        (char*)&sp->socket->socket, sizeof(sp->socket->socket));
341       if (err) {
342         char* utf8_message = gpr_format_message(WSAGetLastError());
343         gpr_log(GPR_ERROR, "setsockopt error: %s", utf8_message);
344         gpr_free(utf8_message);
345       }
346       int peer_name_len = (int)peer_name.len;
347       err = getpeername(sock, (grpc_sockaddr*)peer_name.addr, &peer_name_len);
348       peer_name.len = (size_t)peer_name_len;
349       if (!err) {
350         peer_name_string = grpc_sockaddr_to_uri(&peer_name);
351       } else {
352         char* utf8_message = gpr_format_message(WSAGetLastError());
353         gpr_log(GPR_ERROR, "getpeername error: %s", utf8_message);
354         gpr_free(utf8_message);
355       }
356       gpr_asprintf(&fd_name, "tcp_server:%s", peer_name_string);
357       ep = grpc_tcp_create(grpc_winsocket_create(sock, fd_name),
358                            sp->server->channel_args, peer_name_string);
359       gpr_free(fd_name);
360       gpr_free(peer_name_string);
361     } else {
362       closesocket(sock);
363     }
364   }
365
366   /* The only time we should call our callback, is where we successfully
367      managed to accept a connection, and created an endpoint. */
368   if (ep) {
369     // Create acceptor.
370     grpc_tcp_server_acceptor* acceptor =
371         (grpc_tcp_server_acceptor*)gpr_malloc(sizeof(*acceptor));
372     acceptor->from_server = sp->server;
373     acceptor->port_index = sp->port_index;
374     acceptor->fd_index = 0;
375     acceptor->external_connection = false;
376     sp->server->on_accept_cb(sp->server->on_accept_cb_arg, ep, NULL, acceptor);
377   }
378   /* As we were notified from the IOCP of one and exactly one accept,
379      the former socked we created has now either been destroy or assigned
380      to the new connection. We need to create a new one for the next
381      connection. */
382   GPR_ASSERT(GRPC_LOG_IF_ERROR("start_accept", start_accept_locked(sp)));
383   if (0 == --sp->outstanding_calls) {
384     decrement_active_ports_and_notify_locked(sp);
385   }
386   gpr_mu_unlock(&sp->server->mu);
387 }
388
389 static grpc_error* add_socket_to_server(grpc_tcp_server* s, SOCKET sock,
390                                         const grpc_resolved_address* addr,
391                                         unsigned port_index,
392                                         grpc_tcp_listener** listener) {
393   grpc_tcp_listener* sp = NULL;
394   int port = -1;
395   int status;
396   GUID guid = WSAID_ACCEPTEX;
397   DWORD ioctl_num_bytes;
398   LPFN_ACCEPTEX AcceptEx;
399   grpc_error* error = GRPC_ERROR_NONE;
400
401   /* We need to grab the AcceptEx pointer for that port, as it may be
402      interface-dependent. We'll cache it to avoid doing that again. */
403   status =
404       WSAIoctl(sock, SIO_GET_EXTENSION_FUNCTION_POINTER, &guid, sizeof(guid),
405                &AcceptEx, sizeof(AcceptEx), &ioctl_num_bytes, NULL, NULL);
406
407   if (status != 0) {
408     char* utf8_message = gpr_format_message(WSAGetLastError());
409     gpr_log(GPR_ERROR, "on_connect error: %s", utf8_message);
410     gpr_free(utf8_message);
411     closesocket(sock);
412     return GRPC_ERROR_NONE;
413   }
414
415   error = prepare_socket(sock, addr, &port);
416   if (error != GRPC_ERROR_NONE) {
417     return error;
418   }
419
420   GPR_ASSERT(port >= 0);
421   gpr_mu_lock(&s->mu);
422   GPR_ASSERT(!s->on_accept_cb && "must add ports before starting server");
423   sp = (grpc_tcp_listener*)gpr_malloc(sizeof(grpc_tcp_listener));
424   sp->next = NULL;
425   if (s->head == NULL) {
426     s->head = sp;
427   } else {
428     s->tail->next = sp;
429   }
430   s->tail = sp;
431   sp->server = s;
432   sp->socket = grpc_winsocket_create(sock, "listener");
433   sp->shutting_down = 0;
434   sp->outstanding_calls = 0;
435   sp->AcceptEx = AcceptEx;
436   sp->new_socket = INVALID_SOCKET;
437   sp->port = port;
438   sp->port_index = port_index;
439   GRPC_CLOSURE_INIT(&sp->on_accept, on_accept, sp, grpc_schedule_on_exec_ctx);
440   GPR_ASSERT(sp->socket);
441   gpr_mu_unlock(&s->mu);
442   *listener = sp;
443
444   return GRPC_ERROR_NONE;
445 }
446
447 static grpc_error* tcp_server_add_port(grpc_tcp_server* s,
448                                        const grpc_resolved_address* addr,
449                                        int* port) {
450   grpc_tcp_listener* sp = NULL;
451   SOCKET sock;
452   grpc_resolved_address addr6_v4mapped;
453   grpc_resolved_address wildcard;
454   grpc_resolved_address* allocated_addr = NULL;
455   grpc_resolved_address sockname_temp;
456   unsigned port_index = 0;
457   grpc_error* error = GRPC_ERROR_NONE;
458
459   if (s->tail != NULL) {
460     port_index = s->tail->port_index + 1;
461   }
462
463   /* Check if this is a wildcard port, and if so, try to keep the port the same
464      as some previously created listener. */
465   if (grpc_sockaddr_get_port(addr) == 0) {
466     for (sp = s->head; sp; sp = sp->next) {
467       int sockname_temp_len = sizeof(struct sockaddr_storage);
468       if (0 == getsockname(sp->socket->socket,
469                            (grpc_sockaddr*)sockname_temp.addr,
470                            &sockname_temp_len)) {
471         sockname_temp.len = (size_t)sockname_temp_len;
472         *port = grpc_sockaddr_get_port(&sockname_temp);
473         if (*port > 0) {
474           allocated_addr =
475               (grpc_resolved_address*)gpr_malloc(sizeof(grpc_resolved_address));
476           memcpy(allocated_addr, addr, sizeof(grpc_resolved_address));
477           grpc_sockaddr_set_port(allocated_addr, *port);
478           addr = allocated_addr;
479           break;
480         }
481       }
482     }
483   }
484
485   if (grpc_sockaddr_to_v4mapped(addr, &addr6_v4mapped)) {
486     addr = &addr6_v4mapped;
487   }
488
489   /* Treat :: or 0.0.0.0 as a family-agnostic wildcard. */
490   if (grpc_sockaddr_is_wildcard(addr, port)) {
491     grpc_sockaddr_make_wildcard6(*port, &wildcard);
492
493     addr = &wildcard;
494   }
495
496   sock = WSASocket(AF_INET6, SOCK_STREAM, IPPROTO_TCP, NULL, 0,
497                    grpc_get_default_wsa_socket_flags());
498   if (sock == INVALID_SOCKET) {
499     error = GRPC_WSA_ERROR(WSAGetLastError(), "WSASocket");
500     goto done;
501   }
502
503   error = add_socket_to_server(s, sock, addr, port_index, &sp);
504
505 done:
506   gpr_free(allocated_addr);
507
508   if (error != GRPC_ERROR_NONE) {
509     grpc_error* error_out = GRPC_ERROR_CREATE_REFERENCING_FROM_STATIC_STRING(
510         "Failed to add port to server", &error, 1);
511     GRPC_ERROR_UNREF(error);
512     error = error_out;
513     *port = -1;
514   } else {
515     GPR_ASSERT(sp != NULL);
516     *port = sp->port;
517   }
518   return error;
519 }
520
521 static void tcp_server_start(grpc_tcp_server* s, grpc_pollset** pollset,
522                              size_t pollset_count,
523                              grpc_tcp_server_cb on_accept_cb,
524                              void* on_accept_cb_arg) {
525   grpc_tcp_listener* sp;
526   GPR_ASSERT(on_accept_cb);
527   gpr_mu_lock(&s->mu);
528   GPR_ASSERT(!s->on_accept_cb);
529   GPR_ASSERT(s->active_ports == 0);
530   s->on_accept_cb = on_accept_cb;
531   s->on_accept_cb_arg = on_accept_cb_arg;
532   for (sp = s->head; sp; sp = sp->next) {
533     GPR_ASSERT(GRPC_LOG_IF_ERROR("start_accept", start_accept_locked(sp)));
534     s->active_ports++;
535   }
536   gpr_mu_unlock(&s->mu);
537 }
538
539 static unsigned tcp_server_port_fd_count(grpc_tcp_server* s,
540                                          unsigned port_index) {
541   return 0;
542 }
543
544 static int tcp_server_port_fd(grpc_tcp_server* s, unsigned port_index,
545                               unsigned fd_index) {
546   return -1;
547 }
548
549 static grpc_core::TcpServerFdHandler* tcp_server_create_fd_handler(
550     grpc_tcp_server* s) {
551   return nullptr;
552 }
553
554 static void tcp_server_shutdown_listeners(grpc_tcp_server* s) {}
555
556 grpc_tcp_server_vtable grpc_windows_tcp_server_vtable = {
557     tcp_server_create,        tcp_server_start,
558     tcp_server_add_port,      tcp_server_create_fd_handler,
559     tcp_server_port_fd_count, tcp_server_port_fd,
560     tcp_server_ref,           tcp_server_shutdown_starting_add,
561     tcp_server_unref,         tcp_server_shutdown_listeners};
562 #endif /* GRPC_WINSOCK_SOCKET */