Built motion from commit 6a09e18b.|2.6.11
[motion2.git] / legacy-libs / grpc-cloned / deps / grpc / src / core / lib / iomgr / tcp_client_posix.cc
1 /*
2  *
3  * Copyright 2015 gRPC authors.
4  *
5  * Licensed under the Apache License, Version 2.0 (the "License");
6  * you may not use this file except in compliance with the License.
7  * You may obtain a copy of the License at
8  *
9  *     http://www.apache.org/licenses/LICENSE-2.0
10  *
11  * Unless required by applicable law or agreed to in writing, software
12  * distributed under the License is distributed on an "AS IS" BASIS,
13  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14  * See the License for the specific language governing permissions and
15  * limitations under the License.
16  *
17  */
18
19 #include <grpc/support/port_platform.h>
20
21 #include "src/core/lib/iomgr/port.h"
22
23 #ifdef GRPC_POSIX_SOCKET_TCP_CLIENT
24
25 #include "src/core/lib/iomgr/tcp_client_posix.h"
26
27 #include <errno.h>
28 #include <netinet/in.h>
29 #include <string.h>
30 #include <unistd.h>
31
32 #include <grpc/support/alloc.h>
33 #include <grpc/support/log.h>
34 #include <grpc/support/string_util.h>
35 #include <grpc/support/time.h>
36
37 #include "src/core/lib/channel/channel_args.h"
38 #include "src/core/lib/gpr/string.h"
39 #include "src/core/lib/iomgr/ev_posix.h"
40 #include "src/core/lib/iomgr/iomgr_posix.h"
41 #include "src/core/lib/iomgr/sockaddr.h"
42 #include "src/core/lib/iomgr/sockaddr_utils.h"
43 #include "src/core/lib/iomgr/socket_mutator.h"
44 #include "src/core/lib/iomgr/socket_utils_posix.h"
45 #include "src/core/lib/iomgr/tcp_posix.h"
46 #include "src/core/lib/iomgr/timer.h"
47 #include "src/core/lib/iomgr/unix_sockets_posix.h"
48 #include "src/core/lib/slice/slice_internal.h"
49
50 extern grpc_core::TraceFlag grpc_tcp_trace;
51
52 typedef struct {
53   gpr_mu mu;
54   grpc_fd* fd;
55   grpc_timer alarm;
56   grpc_closure on_alarm;
57   int refs;
58   grpc_closure write_closure;
59   grpc_pollset_set* interested_parties;
60   char* addr_str;
61   grpc_endpoint** ep;
62   grpc_closure* closure;
63   grpc_channel_args* channel_args;
64 } async_connect;
65
66 static grpc_error* prepare_socket(const grpc_resolved_address* addr, int fd,
67                                   const grpc_channel_args* channel_args) {
68   grpc_error* err = GRPC_ERROR_NONE;
69
70   GPR_ASSERT(fd >= 0);
71
72   err = grpc_set_socket_nonblocking(fd, 1);
73   if (err != GRPC_ERROR_NONE) goto error;
74   err = grpc_set_socket_cloexec(fd, 1);
75   if (err != GRPC_ERROR_NONE) goto error;
76   if (!grpc_is_unix_socket(addr)) {
77     err = grpc_set_socket_low_latency(fd, 1);
78     if (err != GRPC_ERROR_NONE) goto error;
79     err = grpc_set_socket_reuse_addr(fd, 1);
80     if (err != GRPC_ERROR_NONE) goto error;
81     err = grpc_set_socket_tcp_user_timeout(fd, channel_args,
82                                            true /* is_client */);
83     if (err != GRPC_ERROR_NONE) goto error;
84   }
85   err = grpc_set_socket_no_sigpipe_if_possible(fd);
86   if (err != GRPC_ERROR_NONE) goto error;
87   if (channel_args) {
88     for (size_t i = 0; i < channel_args->num_args; i++) {
89       if (0 == strcmp(channel_args->args[i].key, GRPC_ARG_SOCKET_MUTATOR)) {
90         GPR_ASSERT(channel_args->args[i].type == GRPC_ARG_POINTER);
91         grpc_socket_mutator* mutator = static_cast<grpc_socket_mutator*>(
92             channel_args->args[i].value.pointer.p);
93         err = grpc_set_socket_with_mutator(fd, mutator);
94         if (err != GRPC_ERROR_NONE) goto error;
95       }
96     }
97   }
98   goto done;
99
100 error:
101   if (fd >= 0) {
102     close(fd);
103   }
104 done:
105   return err;
106 }
107
108 static void tc_on_alarm(void* acp, grpc_error* error) {
109   int done;
110   async_connect* ac = static_cast<async_connect*>(acp);
111   if (GRPC_TRACE_FLAG_ENABLED(grpc_tcp_trace)) {
112     const char* str = grpc_error_string(error);
113     gpr_log(GPR_INFO, "CLIENT_CONNECT: %s: on_alarm: error=%s", ac->addr_str,
114             str);
115   }
116   gpr_mu_lock(&ac->mu);
117   if (ac->fd != nullptr) {
118     grpc_fd_shutdown(
119         ac->fd, GRPC_ERROR_CREATE_FROM_STATIC_STRING("connect() timed out"));
120   }
121   done = (--ac->refs == 0);
122   gpr_mu_unlock(&ac->mu);
123   if (done) {
124     gpr_mu_destroy(&ac->mu);
125     gpr_free(ac->addr_str);
126     grpc_channel_args_destroy(ac->channel_args);
127     gpr_free(ac);
128   }
129 }
130
131 grpc_endpoint* grpc_tcp_client_create_from_fd(
132     grpc_fd* fd, const grpc_channel_args* channel_args, const char* addr_str) {
133   return grpc_tcp_create(fd, channel_args, addr_str);
134 }
135
136 static void on_writable(void* acp, grpc_error* error) {
137   async_connect* ac = static_cast<async_connect*>(acp);
138   int so_error = 0;
139   socklen_t so_error_size;
140   int err;
141   int done;
142   grpc_endpoint** ep = ac->ep;
143   grpc_closure* closure = ac->closure;
144   grpc_fd* fd;
145
146   GRPC_ERROR_REF(error);
147
148   if (GRPC_TRACE_FLAG_ENABLED(grpc_tcp_trace)) {
149     const char* str = grpc_error_string(error);
150     gpr_log(GPR_INFO, "CLIENT_CONNECT: %s: on_writable: error=%s", ac->addr_str,
151             str);
152   }
153
154   gpr_mu_lock(&ac->mu);
155   GPR_ASSERT(ac->fd);
156   fd = ac->fd;
157   ac->fd = nullptr;
158   gpr_mu_unlock(&ac->mu);
159
160   grpc_timer_cancel(&ac->alarm);
161
162   gpr_mu_lock(&ac->mu);
163   if (error != GRPC_ERROR_NONE) {
164     error =
165         grpc_error_set_str(error, GRPC_ERROR_STR_OS_ERROR,
166                            grpc_slice_from_static_string("Timeout occurred"));
167     goto finish;
168   }
169
170   do {
171     so_error_size = sizeof(so_error);
172     err = getsockopt(grpc_fd_wrapped_fd(fd), SOL_SOCKET, SO_ERROR, &so_error,
173                      &so_error_size);
174   } while (err < 0 && errno == EINTR);
175   if (err < 0) {
176     error = GRPC_OS_ERROR(errno, "getsockopt");
177     goto finish;
178   }
179
180   switch (so_error) {
181     case 0:
182       grpc_pollset_set_del_fd(ac->interested_parties, fd);
183       *ep = grpc_tcp_client_create_from_fd(fd, ac->channel_args, ac->addr_str);
184       fd = nullptr;
185       break;
186     case ENOBUFS:
187       /* We will get one of these errors if we have run out of
188          memory in the kernel for the data structures allocated
189          when you connect a socket.  If this happens it is very
190          likely that if we wait a little bit then try again the
191          connection will work (since other programs or this
192          program will close their network connections and free up
193          memory).  This does _not_ indicate that there is anything
194          wrong with the server we are connecting to, this is a
195          local problem.
196
197          If you are looking at this code, then chances are that
198          your program or another program on the same computer
199          opened too many network connections.  The "easy" fix:
200          don't do that! */
201       gpr_log(GPR_ERROR, "kernel out of buffers");
202       gpr_mu_unlock(&ac->mu);
203       grpc_fd_notify_on_write(fd, &ac->write_closure);
204       return;
205     case ECONNREFUSED:
206       /* This error shouldn't happen for anything other than connect(). */
207       error = GRPC_OS_ERROR(so_error, "connect");
208       break;
209     default:
210       /* We don't really know which syscall triggered the problem here,
211          so punt by reporting getsockopt(). */
212       error = GRPC_OS_ERROR(so_error, "getsockopt(SO_ERROR)");
213       break;
214   }
215
216 finish:
217   if (fd != nullptr) {
218     grpc_pollset_set_del_fd(ac->interested_parties, fd);
219     grpc_fd_orphan(fd, nullptr, nullptr, "tcp_client_orphan");
220     fd = nullptr;
221   }
222   done = (--ac->refs == 0);
223   // Create a copy of the data from "ac" to be accessed after the unlock, as
224   // "ac" and its contents may be deallocated by the time they are read.
225   const grpc_slice addr_str_slice = grpc_slice_from_copied_string(ac->addr_str);
226   gpr_mu_unlock(&ac->mu);
227   if (error != GRPC_ERROR_NONE) {
228     char* error_descr;
229     grpc_slice str;
230     bool ret = grpc_error_get_str(error, GRPC_ERROR_STR_DESCRIPTION, &str);
231     GPR_ASSERT(ret);
232     char* desc = grpc_slice_to_c_string(str);
233     gpr_asprintf(&error_descr, "Failed to connect to remote host: %s", desc);
234     error = grpc_error_set_str(error, GRPC_ERROR_STR_DESCRIPTION,
235                                grpc_slice_from_copied_string(error_descr));
236     gpr_free(error_descr);
237     gpr_free(desc);
238     error = grpc_error_set_str(error, GRPC_ERROR_STR_TARGET_ADDRESS,
239                                addr_str_slice /* takes ownership */);
240   } else {
241     grpc_slice_unref_internal(addr_str_slice);
242   }
243   if (done) {
244     // This is safe even outside the lock, because "done", the sentinel, is
245     // populated *inside* the lock.
246     gpr_mu_destroy(&ac->mu);
247     gpr_free(ac->addr_str);
248     grpc_channel_args_destroy(ac->channel_args);
249     gpr_free(ac);
250   }
251   GRPC_CLOSURE_SCHED(closure, error);
252 }
253
254 grpc_error* grpc_tcp_client_prepare_fd(const grpc_channel_args* channel_args,
255                                        const grpc_resolved_address* addr,
256                                        grpc_resolved_address* mapped_addr,
257                                        grpc_fd** fdobj) {
258   grpc_dualstack_mode dsmode;
259   int fd;
260   grpc_error* error;
261   char* name;
262   char* addr_str;
263   *fdobj = nullptr;
264   /* Use dualstack sockets where available. Set mapped to v6 or v4 mapped to
265      v6. */
266   if (!grpc_sockaddr_to_v4mapped(addr, mapped_addr)) {
267     /* addr is v4 mapped to v6 or v6. */
268     memcpy(mapped_addr, addr, sizeof(*mapped_addr));
269   }
270   error =
271       grpc_create_dualstack_socket(mapped_addr, SOCK_STREAM, 0, &dsmode, &fd);
272   if (error != GRPC_ERROR_NONE) {
273     return error;
274   }
275   if (dsmode == GRPC_DSMODE_IPV4) {
276     /* Original addr is either v4 or v4 mapped to v6. Set mapped_addr to v4. */
277     if (!grpc_sockaddr_is_v4mapped(addr, mapped_addr)) {
278       memcpy(mapped_addr, addr, sizeof(*mapped_addr));
279     }
280   }
281   if ((error = prepare_socket(mapped_addr, fd, channel_args)) !=
282       GRPC_ERROR_NONE) {
283     return error;
284   }
285   addr_str = grpc_sockaddr_to_uri(mapped_addr);
286   gpr_asprintf(&name, "tcp-client:%s", addr_str);
287   *fdobj = grpc_fd_create(fd, name, true);
288   gpr_free(name);
289   gpr_free(addr_str);
290   return GRPC_ERROR_NONE;
291 }
292
293 void grpc_tcp_client_create_from_prepared_fd(
294     grpc_pollset_set* interested_parties, grpc_closure* closure, grpc_fd* fdobj,
295     const grpc_channel_args* channel_args, const grpc_resolved_address* addr,
296     grpc_millis deadline, grpc_endpoint** ep) {
297   const int fd = grpc_fd_wrapped_fd(fdobj);
298   int err;
299   async_connect* ac;
300   do {
301     err = connect(fd, reinterpret_cast<const grpc_sockaddr*>(addr->addr),
302                   addr->len);
303   } while (err < 0 && errno == EINTR);
304   if (err >= 0) {
305     char* addr_str = grpc_sockaddr_to_uri(addr);
306     *ep = grpc_tcp_client_create_from_fd(fdobj, channel_args, addr_str);
307     gpr_free(addr_str);
308     GRPC_CLOSURE_SCHED(closure, GRPC_ERROR_NONE);
309     return;
310   }
311   if (errno != EWOULDBLOCK && errno != EINPROGRESS) {
312     grpc_fd_orphan(fdobj, nullptr, nullptr, "tcp_client_connect_error");
313     GRPC_CLOSURE_SCHED(closure, GRPC_OS_ERROR(errno, "connect"));
314     return;
315   }
316
317   grpc_pollset_set_add_fd(interested_parties, fdobj);
318
319   ac = static_cast<async_connect*>(gpr_malloc(sizeof(async_connect)));
320   ac->closure = closure;
321   ac->ep = ep;
322   ac->fd = fdobj;
323   ac->interested_parties = interested_parties;
324   ac->addr_str = grpc_sockaddr_to_uri(addr);
325   gpr_mu_init(&ac->mu);
326   ac->refs = 2;
327   GRPC_CLOSURE_INIT(&ac->write_closure, on_writable, ac,
328                     grpc_schedule_on_exec_ctx);
329   ac->channel_args = grpc_channel_args_copy(channel_args);
330
331   if (GRPC_TRACE_FLAG_ENABLED(grpc_tcp_trace)) {
332     gpr_log(GPR_INFO, "CLIENT_CONNECT: %s: asynchronously connecting fd %p",
333             ac->addr_str, fdobj);
334   }
335
336   gpr_mu_lock(&ac->mu);
337   GRPC_CLOSURE_INIT(&ac->on_alarm, tc_on_alarm, ac, grpc_schedule_on_exec_ctx);
338   grpc_timer_init(&ac->alarm, deadline, &ac->on_alarm);
339   grpc_fd_notify_on_write(ac->fd, &ac->write_closure);
340   gpr_mu_unlock(&ac->mu);
341 }
342
343 static void tcp_connect(grpc_closure* closure, grpc_endpoint** ep,
344                         grpc_pollset_set* interested_parties,
345                         const grpc_channel_args* channel_args,
346                         const grpc_resolved_address* addr,
347                         grpc_millis deadline) {
348   grpc_resolved_address mapped_addr;
349   grpc_fd* fdobj = nullptr;
350   grpc_error* error;
351   *ep = nullptr;
352   if ((error = grpc_tcp_client_prepare_fd(channel_args, addr, &mapped_addr,
353                                           &fdobj)) != GRPC_ERROR_NONE) {
354     GRPC_CLOSURE_SCHED(closure, error);
355     return;
356   }
357   grpc_tcp_client_create_from_prepared_fd(interested_parties, closure, fdobj,
358                                           channel_args, &mapped_addr, deadline,
359                                           ep);
360 }
361
362 grpc_tcp_client_vtable grpc_posix_tcp_client_vtable = {tcp_connect};
363 #endif