3 * Copyright 2015 gRPC authors.
5 * Licensed under the Apache License, Version 2.0 (the "License");
6 * you may not use this file except in compliance with the License.
7 * You may obtain a copy of the License at
9 * http://www.apache.org/licenses/LICENSE-2.0
11 * Unless required by applicable law or agreed to in writing, software
12 * distributed under the License is distributed on an "AS IS" BASIS,
13 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14 * See the License for the specific language governing permissions and
15 * limitations under the License.
19 #include <grpc/support/port_platform.h>
21 #include "src/core/lib/iomgr/port.h"
23 #ifdef GRPC_POSIX_SOCKET_TCP_CLIENT
25 #include "src/core/lib/iomgr/tcp_client_posix.h"
28 #include <netinet/in.h>
32 #include <grpc/support/alloc.h>
33 #include <grpc/support/log.h>
34 #include <grpc/support/string_util.h>
35 #include <grpc/support/time.h>
37 #include "src/core/lib/channel/channel_args.h"
38 #include "src/core/lib/gpr/string.h"
39 #include "src/core/lib/iomgr/ev_posix.h"
40 #include "src/core/lib/iomgr/iomgr_posix.h"
41 #include "src/core/lib/iomgr/sockaddr.h"
42 #include "src/core/lib/iomgr/sockaddr_utils.h"
43 #include "src/core/lib/iomgr/socket_mutator.h"
44 #include "src/core/lib/iomgr/socket_utils_posix.h"
45 #include "src/core/lib/iomgr/tcp_posix.h"
46 #include "src/core/lib/iomgr/timer.h"
47 #include "src/core/lib/iomgr/unix_sockets_posix.h"
48 #include "src/core/lib/slice/slice_internal.h"
50 extern grpc_core::TraceFlag grpc_tcp_trace;
56 grpc_closure on_alarm;
58 grpc_closure write_closure;
59 grpc_pollset_set* interested_parties;
62 grpc_closure* closure;
63 grpc_channel_args* channel_args;
66 static grpc_error* prepare_socket(const grpc_resolved_address* addr, int fd,
67 const grpc_channel_args* channel_args) {
68 grpc_error* err = GRPC_ERROR_NONE;
72 err = grpc_set_socket_nonblocking(fd, 1);
73 if (err != GRPC_ERROR_NONE) goto error;
74 err = grpc_set_socket_cloexec(fd, 1);
75 if (err != GRPC_ERROR_NONE) goto error;
76 if (!grpc_is_unix_socket(addr)) {
77 err = grpc_set_socket_low_latency(fd, 1);
78 if (err != GRPC_ERROR_NONE) goto error;
79 err = grpc_set_socket_reuse_addr(fd, 1);
80 if (err != GRPC_ERROR_NONE) goto error;
81 err = grpc_set_socket_tcp_user_timeout(fd, channel_args,
82 true /* is_client */);
83 if (err != GRPC_ERROR_NONE) goto error;
85 err = grpc_set_socket_no_sigpipe_if_possible(fd);
86 if (err != GRPC_ERROR_NONE) goto error;
88 for (size_t i = 0; i < channel_args->num_args; i++) {
89 if (0 == strcmp(channel_args->args[i].key, GRPC_ARG_SOCKET_MUTATOR)) {
90 GPR_ASSERT(channel_args->args[i].type == GRPC_ARG_POINTER);
91 grpc_socket_mutator* mutator = static_cast<grpc_socket_mutator*>(
92 channel_args->args[i].value.pointer.p);
93 err = grpc_set_socket_with_mutator(fd, mutator);
94 if (err != GRPC_ERROR_NONE) goto error;
108 static void tc_on_alarm(void* acp, grpc_error* error) {
110 async_connect* ac = static_cast<async_connect*>(acp);
111 if (GRPC_TRACE_FLAG_ENABLED(grpc_tcp_trace)) {
112 const char* str = grpc_error_string(error);
113 gpr_log(GPR_INFO, "CLIENT_CONNECT: %s: on_alarm: error=%s", ac->addr_str,
116 gpr_mu_lock(&ac->mu);
117 if (ac->fd != nullptr) {
119 ac->fd, GRPC_ERROR_CREATE_FROM_STATIC_STRING("connect() timed out"));
121 done = (--ac->refs == 0);
122 gpr_mu_unlock(&ac->mu);
124 gpr_mu_destroy(&ac->mu);
125 gpr_free(ac->addr_str);
126 grpc_channel_args_destroy(ac->channel_args);
131 grpc_endpoint* grpc_tcp_client_create_from_fd(
132 grpc_fd* fd, const grpc_channel_args* channel_args, const char* addr_str) {
133 return grpc_tcp_create(fd, channel_args, addr_str);
136 static void on_writable(void* acp, grpc_error* error) {
137 async_connect* ac = static_cast<async_connect*>(acp);
139 socklen_t so_error_size;
142 grpc_endpoint** ep = ac->ep;
143 grpc_closure* closure = ac->closure;
146 GRPC_ERROR_REF(error);
148 if (GRPC_TRACE_FLAG_ENABLED(grpc_tcp_trace)) {
149 const char* str = grpc_error_string(error);
150 gpr_log(GPR_INFO, "CLIENT_CONNECT: %s: on_writable: error=%s", ac->addr_str,
154 gpr_mu_lock(&ac->mu);
158 gpr_mu_unlock(&ac->mu);
160 grpc_timer_cancel(&ac->alarm);
162 gpr_mu_lock(&ac->mu);
163 if (error != GRPC_ERROR_NONE) {
165 grpc_error_set_str(error, GRPC_ERROR_STR_OS_ERROR,
166 grpc_slice_from_static_string("Timeout occurred"));
171 so_error_size = sizeof(so_error);
172 err = getsockopt(grpc_fd_wrapped_fd(fd), SOL_SOCKET, SO_ERROR, &so_error,
174 } while (err < 0 && errno == EINTR);
176 error = GRPC_OS_ERROR(errno, "getsockopt");
182 grpc_pollset_set_del_fd(ac->interested_parties, fd);
183 *ep = grpc_tcp_client_create_from_fd(fd, ac->channel_args, ac->addr_str);
187 /* We will get one of these errors if we have run out of
188 memory in the kernel for the data structures allocated
189 when you connect a socket. If this happens it is very
190 likely that if we wait a little bit then try again the
191 connection will work (since other programs or this
192 program will close their network connections and free up
193 memory). This does _not_ indicate that there is anything
194 wrong with the server we are connecting to, this is a
197 If you are looking at this code, then chances are that
198 your program or another program on the same computer
199 opened too many network connections. The "easy" fix:
201 gpr_log(GPR_ERROR, "kernel out of buffers");
202 gpr_mu_unlock(&ac->mu);
203 grpc_fd_notify_on_write(fd, &ac->write_closure);
206 /* This error shouldn't happen for anything other than connect(). */
207 error = GRPC_OS_ERROR(so_error, "connect");
210 /* We don't really know which syscall triggered the problem here,
211 so punt by reporting getsockopt(). */
212 error = GRPC_OS_ERROR(so_error, "getsockopt(SO_ERROR)");
218 grpc_pollset_set_del_fd(ac->interested_parties, fd);
219 grpc_fd_orphan(fd, nullptr, nullptr, "tcp_client_orphan");
222 done = (--ac->refs == 0);
223 // Create a copy of the data from "ac" to be accessed after the unlock, as
224 // "ac" and its contents may be deallocated by the time they are read.
225 const grpc_slice addr_str_slice = grpc_slice_from_copied_string(ac->addr_str);
226 gpr_mu_unlock(&ac->mu);
227 if (error != GRPC_ERROR_NONE) {
230 bool ret = grpc_error_get_str(error, GRPC_ERROR_STR_DESCRIPTION, &str);
232 char* desc = grpc_slice_to_c_string(str);
233 gpr_asprintf(&error_descr, "Failed to connect to remote host: %s", desc);
234 error = grpc_error_set_str(error, GRPC_ERROR_STR_DESCRIPTION,
235 grpc_slice_from_copied_string(error_descr));
236 gpr_free(error_descr);
238 error = grpc_error_set_str(error, GRPC_ERROR_STR_TARGET_ADDRESS,
239 addr_str_slice /* takes ownership */);
241 grpc_slice_unref_internal(addr_str_slice);
244 // This is safe even outside the lock, because "done", the sentinel, is
245 // populated *inside* the lock.
246 gpr_mu_destroy(&ac->mu);
247 gpr_free(ac->addr_str);
248 grpc_channel_args_destroy(ac->channel_args);
251 GRPC_CLOSURE_SCHED(closure, error);
254 grpc_error* grpc_tcp_client_prepare_fd(const grpc_channel_args* channel_args,
255 const grpc_resolved_address* addr,
256 grpc_resolved_address* mapped_addr,
258 grpc_dualstack_mode dsmode;
264 /* Use dualstack sockets where available. Set mapped to v6 or v4 mapped to
266 if (!grpc_sockaddr_to_v4mapped(addr, mapped_addr)) {
267 /* addr is v4 mapped to v6 or v6. */
268 memcpy(mapped_addr, addr, sizeof(*mapped_addr));
271 grpc_create_dualstack_socket(mapped_addr, SOCK_STREAM, 0, &dsmode, &fd);
272 if (error != GRPC_ERROR_NONE) {
275 if (dsmode == GRPC_DSMODE_IPV4) {
276 /* Original addr is either v4 or v4 mapped to v6. Set mapped_addr to v4. */
277 if (!grpc_sockaddr_is_v4mapped(addr, mapped_addr)) {
278 memcpy(mapped_addr, addr, sizeof(*mapped_addr));
281 if ((error = prepare_socket(mapped_addr, fd, channel_args)) !=
285 addr_str = grpc_sockaddr_to_uri(mapped_addr);
286 gpr_asprintf(&name, "tcp-client:%s", addr_str);
287 *fdobj = grpc_fd_create(fd, name, true);
290 return GRPC_ERROR_NONE;
293 void grpc_tcp_client_create_from_prepared_fd(
294 grpc_pollset_set* interested_parties, grpc_closure* closure, grpc_fd* fdobj,
295 const grpc_channel_args* channel_args, const grpc_resolved_address* addr,
296 grpc_millis deadline, grpc_endpoint** ep) {
297 const int fd = grpc_fd_wrapped_fd(fdobj);
301 err = connect(fd, reinterpret_cast<const grpc_sockaddr*>(addr->addr),
303 } while (err < 0 && errno == EINTR);
305 char* addr_str = grpc_sockaddr_to_uri(addr);
306 *ep = grpc_tcp_client_create_from_fd(fdobj, channel_args, addr_str);
308 GRPC_CLOSURE_SCHED(closure, GRPC_ERROR_NONE);
311 if (errno != EWOULDBLOCK && errno != EINPROGRESS) {
312 grpc_fd_orphan(fdobj, nullptr, nullptr, "tcp_client_connect_error");
313 GRPC_CLOSURE_SCHED(closure, GRPC_OS_ERROR(errno, "connect"));
317 grpc_pollset_set_add_fd(interested_parties, fdobj);
319 ac = static_cast<async_connect*>(gpr_malloc(sizeof(async_connect)));
320 ac->closure = closure;
323 ac->interested_parties = interested_parties;
324 ac->addr_str = grpc_sockaddr_to_uri(addr);
325 gpr_mu_init(&ac->mu);
327 GRPC_CLOSURE_INIT(&ac->write_closure, on_writable, ac,
328 grpc_schedule_on_exec_ctx);
329 ac->channel_args = grpc_channel_args_copy(channel_args);
331 if (GRPC_TRACE_FLAG_ENABLED(grpc_tcp_trace)) {
332 gpr_log(GPR_INFO, "CLIENT_CONNECT: %s: asynchronously connecting fd %p",
333 ac->addr_str, fdobj);
336 gpr_mu_lock(&ac->mu);
337 GRPC_CLOSURE_INIT(&ac->on_alarm, tc_on_alarm, ac, grpc_schedule_on_exec_ctx);
338 grpc_timer_init(&ac->alarm, deadline, &ac->on_alarm);
339 grpc_fd_notify_on_write(ac->fd, &ac->write_closure);
340 gpr_mu_unlock(&ac->mu);
343 static void tcp_connect(grpc_closure* closure, grpc_endpoint** ep,
344 grpc_pollset_set* interested_parties,
345 const grpc_channel_args* channel_args,
346 const grpc_resolved_address* addr,
347 grpc_millis deadline) {
348 grpc_resolved_address mapped_addr;
349 grpc_fd* fdobj = nullptr;
352 if ((error = grpc_tcp_client_prepare_fd(channel_args, addr, &mapped_addr,
353 &fdobj)) != GRPC_ERROR_NONE) {
354 GRPC_CLOSURE_SCHED(closure, error);
357 grpc_tcp_client_create_from_prepared_fd(interested_parties, closure, fdobj,
358 channel_args, &mapped_addr, deadline,
362 grpc_tcp_client_vtable grpc_posix_tcp_client_vtable = {tcp_connect};