3 * Copyright 2018 gRPC authors.
5 * Licensed under the Apache License, Version 2.0 (the "License");
6 * you may not use this file except in compliance with the License.
7 * You may obtain a copy of the License at
9 * http://www.apache.org/licenses/LICENSE-2.0
11 * Unless required by applicable law or agreed to in writing, software
12 * distributed under the License is distributed on an "AS IS" BASIS,
13 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14 * See the License for the specific language governing permissions and
15 * limitations under the License.
19 #include <grpc/support/port_platform.h>
21 #include "src/core/lib/iomgr/port.h"
25 #include <grpc/support/alloc.h>
26 #include <grpc/support/log.h>
28 #include "src/core/lib/iomgr/error.h"
29 #include "src/core/lib/iomgr/iomgr_custom.h"
30 #include "src/core/lib/iomgr/sockaddr_utils.h"
31 #include "src/core/lib/iomgr/tcp_client.h"
32 #include "src/core/lib/iomgr/tcp_custom.h"
33 #include "src/core/lib/iomgr/timer.h"
35 extern grpc_core::TraceFlag grpc_tcp_trace;
36 extern grpc_socket_vtable* grpc_custom_socket_vtable;
38 struct grpc_custom_tcp_connect {
39 grpc_custom_socket* socket;
41 grpc_closure on_alarm;
42 grpc_closure* closure;
43 grpc_endpoint** endpoint;
46 grpc_resource_quota* resource_quota;
49 static void custom_tcp_connect_cleanup(grpc_custom_tcp_connect* connect) {
50 grpc_custom_socket* socket = connect->socket;
51 grpc_resource_quota_unref_internal(connect->resource_quota);
52 gpr_free(connect->addr_name);
55 if (socket->refs == 0) {
56 grpc_custom_socket_vtable->destroy(socket);
61 static void custom_close_callback(grpc_custom_socket* socket) {}
63 static void on_alarm(void* acp, grpc_error* error) {
65 grpc_custom_socket* socket = (grpc_custom_socket*)acp;
66 grpc_custom_tcp_connect* connect = socket->connector;
67 if (GRPC_TRACE_FLAG_ENABLED(grpc_tcp_trace)) {
68 const char* str = grpc_error_string(error);
69 gpr_log(GPR_INFO, "CLIENT_CONNECT: %s: on_alarm: error=%s",
70 connect->addr_name, str);
72 if (error == GRPC_ERROR_NONE) {
73 /* error == NONE implies that the timer ran out, and wasn't cancelled. If
74 it was cancelled, then the handler that cancelled it also should close
75 the handle, if applicable */
76 grpc_custom_socket_vtable->close(socket, custom_close_callback);
78 done = (--connect->refs == 0);
80 custom_tcp_connect_cleanup(connect);
84 static void custom_connect_callback_internal(grpc_custom_socket* socket,
86 grpc_custom_tcp_connect* connect = socket->connector;
88 grpc_closure* closure = connect->closure;
89 grpc_timer_cancel(&connect->alarm);
90 if (error == GRPC_ERROR_NONE) {
91 *connect->endpoint = custom_tcp_endpoint_create(
92 socket, connect->resource_quota, connect->addr_name);
94 done = (--connect->refs == 0);
96 grpc_core::ExecCtx::Get()->Flush();
97 custom_tcp_connect_cleanup(connect);
99 GRPC_CLOSURE_SCHED(closure, error);
102 static void custom_connect_callback(grpc_custom_socket* socket,
104 grpc_core::ApplicationCallbackExecCtx callback_exec_ctx;
105 if (grpc_core::ExecCtx::Get() == nullptr) {
106 /* If we are being run on a thread which does not have an exec_ctx created
107 * yet, we should create one. */
108 grpc_core::ExecCtx exec_ctx;
109 custom_connect_callback_internal(socket, error);
111 custom_connect_callback_internal(socket, error);
115 static void tcp_connect(grpc_closure* closure, grpc_endpoint** ep,
116 grpc_pollset_set* interested_parties,
117 const grpc_channel_args* channel_args,
118 const grpc_resolved_address* resolved_addr,
119 grpc_millis deadline) {
120 GRPC_CUSTOM_IOMGR_ASSERT_SAME_THREAD();
122 (void)interested_parties;
123 grpc_custom_tcp_connect* connect;
124 grpc_resource_quota* resource_quota = grpc_resource_quota_create(nullptr);
125 if (channel_args != nullptr) {
126 for (size_t i = 0; i < channel_args->num_args; i++) {
127 if (0 == strcmp(channel_args->args[i].key, GRPC_ARG_RESOURCE_QUOTA)) {
128 grpc_resource_quota_unref_internal(resource_quota);
129 resource_quota = grpc_resource_quota_ref_internal(
130 (grpc_resource_quota*)channel_args->args[i].value.pointer.p);
134 grpc_custom_socket* socket =
135 (grpc_custom_socket*)gpr_malloc(sizeof(grpc_custom_socket));
137 grpc_custom_socket_vtable->init(socket, GRPC_AF_UNSPEC);
139 (grpc_custom_tcp_connect*)gpr_malloc(sizeof(grpc_custom_tcp_connect));
140 connect->closure = closure;
141 connect->endpoint = ep;
142 connect->addr_name = grpc_sockaddr_to_uri(resolved_addr);
143 connect->resource_quota = resource_quota;
144 connect->socket = socket;
145 socket->connector = connect;
146 socket->endpoint = nullptr;
147 socket->listener = nullptr;
150 if (GRPC_TRACE_FLAG_ENABLED(grpc_tcp_trace)) {
151 gpr_log(GPR_INFO, "CLIENT_CONNECT: %p %s: asynchronously connecting",
152 socket, connect->addr_name);
155 GRPC_CLOSURE_INIT(&connect->on_alarm, on_alarm, socket,
156 grpc_schedule_on_exec_ctx);
157 grpc_timer_init(&connect->alarm, deadline, &connect->on_alarm);
158 grpc_custom_socket_vtable->connect(
159 socket, (const grpc_sockaddr*)resolved_addr->addr, resolved_addr->len,
160 custom_connect_callback);
163 grpc_tcp_client_vtable custom_tcp_client_vtable = {tcp_connect};