3 * Copyright 2016 gRPC authors.
5 * Licensed under the Apache License, Version 2.0 (the "License");
6 * you may not use this file except in compliance with the License.
7 * You may obtain a copy of the License at
9 * http://www.apache.org/licenses/LICENSE-2.0
11 * Unless required by applicable law or agreed to in writing, software
12 * distributed under the License is distributed on an "AS IS" BASIS,
13 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14 * See the License for the specific language governing permissions and
15 * limitations under the License.
18 #include <grpc/support/port_platform.h>
20 #include "src/core/lib/iomgr/port.h"
21 #if GRPC_ARES == 1 && defined(GRPC_WINDOWS_SOCKET_ARES_EV_DRIVER)
25 #include <grpc/support/alloc.h>
26 #include <grpc/support/log.h>
27 #include <grpc/support/log_windows.h>
28 #include <grpc/support/string_util.h>
29 #include <grpc/support/time.h>
31 #include "src/core/lib/gpr/string.h"
32 #include "src/core/lib/gprpp/memory.h"
33 #include "src/core/lib/iomgr/combiner.h"
34 #include "src/core/lib/iomgr/socket_windows.h"
35 #include "src/core/lib/iomgr/tcp_windows.h"
36 #include "src/core/lib/slice/slice_internal.h"
38 #include "src/core/ext/filters/client_channel/resolver/dns/c_ares/grpc_ares_ev_driver.h"
39 #include "src/core/ext/filters/client_channel/resolver/dns/c_ares/grpc_ares_wrapper.h"
41 /* TODO(apolcyn): remove this hack after fixing upstream.
42 * Our grpc/c-ares code on Windows uses the ares_set_socket_functions API,
43 * which uses "struct iovec" type, which on Windows is defined inside of
44 * a c-ares header that is not public.
45 * See https://github.com/c-ares/c-ares/issues/206. */
53 /* c-ares creates its own sockets and is meant to read them when readable and
54 * write them when writeable. To fit this socket usage model into the grpc
55 * windows poller (which gives notifications when attempted reads and writes are
56 * actually fulfilled rather than possible), this GrpcPolledFdWindows class
57 * takes advantage of the ares_set_socket_functions API and acts as a virtual
58 * socket. It holds its own read and write buffers which are written to and read
59 * from c-ares and are used with the grpc windows poller, and it, e.g.,
60 * manufactures virtual socket error codes when it e.g. needs to tell the c-ares
61 * library to wait for an async read. */
62 class GrpcPolledFdWindows : public GrpcPolledFd {
68 WRITE_WAITING_FOR_VERIFICATION_UPON_RETRY,
71 GrpcPolledFdWindows(ares_socket_t as, grpc_combiner* combiner)
72 : read_buf_(grpc_empty_slice()),
73 write_buf_(grpc_empty_slice()),
74 write_state_(WRITE_IDLE),
75 gotten_into_driver_list_(false) {
76 gpr_asprintf(&name_, "c-ares socket: %" PRIdPTR, as);
77 winsocket_ = grpc_winsocket_create(as, name_);
78 combiner_ = GRPC_COMBINER_REF(combiner, name_);
79 GRPC_CLOSURE_INIT(&outer_read_closure_,
80 &GrpcPolledFdWindows::OnIocpReadable, this,
81 grpc_combiner_scheduler(combiner_));
82 GRPC_CLOSURE_INIT(&outer_write_closure_,
83 &GrpcPolledFdWindows::OnIocpWriteable, this,
84 grpc_combiner_scheduler(combiner_));
87 ~GrpcPolledFdWindows() {
88 GRPC_COMBINER_UNREF(combiner_, name_);
89 grpc_slice_unref_internal(read_buf_);
90 grpc_slice_unref_internal(write_buf_);
91 GPR_ASSERT(read_closure_ == nullptr);
92 GPR_ASSERT(write_closure_ == nullptr);
93 grpc_winsocket_destroy(winsocket_);
97 void ScheduleAndNullReadClosure(grpc_error* error) {
98 GRPC_CLOSURE_SCHED(read_closure_, error);
99 read_closure_ = nullptr;
102 void ScheduleAndNullWriteClosure(grpc_error* error) {
103 GRPC_CLOSURE_SCHED(write_closure_, error);
104 write_closure_ = nullptr;
107 void RegisterForOnReadableLocked(grpc_closure* read_closure) override {
108 GPR_ASSERT(read_closure_ == nullptr);
109 read_closure_ = read_closure;
110 GPR_ASSERT(GRPC_SLICE_LENGTH(read_buf_) == 0);
111 grpc_slice_unref_internal(read_buf_);
112 GPR_ASSERT(!read_buf_has_data_);
113 read_buf_ = GRPC_SLICE_MALLOC(4192);
115 buffer.buf = (char*)GRPC_SLICE_START_PTR(read_buf_);
116 buffer.len = GRPC_SLICE_LENGTH(read_buf_);
117 memset(&winsocket_->read_info.overlapped, 0, sizeof(OVERLAPPED));
118 recv_from_source_addr_len_ = sizeof(recv_from_source_addr_);
120 if (WSARecvFrom(grpc_winsocket_wrapped_socket(winsocket_), &buffer, 1,
121 nullptr, &flags, (sockaddr*)recv_from_source_addr_,
122 &recv_from_source_addr_len_,
123 &winsocket_->read_info.overlapped, nullptr)) {
124 int wsa_last_error = WSAGetLastError();
125 char* msg = gpr_format_message(wsa_last_error);
126 grpc_error* error = GRPC_ERROR_CREATE_FROM_COPIED_STRING(msg);
127 GRPC_CARES_TRACE_LOG(
128 "RegisterForOnReadableLocked: WSARecvFrom error:|%s|. fd:|%s|", msg,
131 if (wsa_last_error != WSA_IO_PENDING) {
132 ScheduleAndNullReadClosure(error);
136 grpc_socket_notify_on_read(winsocket_, &outer_read_closure_);
139 void RegisterForOnWriteableLocked(grpc_closure* write_closure) override {
140 GRPC_CARES_TRACE_LOG(
141 "RegisterForOnWriteableLocked. fd:|%s|. Current write state: %d",
142 GetName(), write_state_);
143 GPR_ASSERT(write_closure_ == nullptr);
144 write_closure_ = write_closure;
145 switch (write_state_) {
147 ScheduleAndNullWriteClosure(GRPC_ERROR_NONE);
149 case WRITE_REQUESTED:
150 write_state_ = WRITE_PENDING;
151 SendWriteBuf(nullptr, &winsocket_->write_info.overlapped);
152 grpc_socket_notify_on_write(winsocket_, &outer_write_closure_);
155 case WRITE_WAITING_FOR_VERIFICATION_UPON_RETRY:
160 bool IsFdStillReadableLocked() override {
161 return GRPC_SLICE_LENGTH(read_buf_) > 0;
164 void ShutdownLocked(grpc_error* error) override {
165 grpc_winsocket_shutdown(winsocket_);
168 ares_socket_t GetWrappedAresSocketLocked() override {
169 return grpc_winsocket_wrapped_socket(winsocket_);
172 const char* GetName() override { return name_; }
174 ares_ssize_t RecvFrom(void* data, ares_socket_t data_len, int flags,
175 struct sockaddr* from, ares_socklen_t* from_len) {
176 GRPC_CARES_TRACE_LOG(
177 "RecvFrom called on fd:|%s|. Current read buf length:|%d|", GetName(),
178 GRPC_SLICE_LENGTH(read_buf_));
179 if (!read_buf_has_data_) {
180 WSASetLastError(WSAEWOULDBLOCK);
183 ares_ssize_t bytes_read = 0;
184 for (size_t i = 0; i < GRPC_SLICE_LENGTH(read_buf_) && i < data_len; i++) {
185 ((char*)data)[i] = GRPC_SLICE_START_PTR(read_buf_)[i];
188 read_buf_ = grpc_slice_sub_no_ref(read_buf_, bytes_read,
189 GRPC_SLICE_LENGTH(read_buf_));
190 if (GRPC_SLICE_LENGTH(read_buf_) == 0) {
191 read_buf_has_data_ = false;
193 /* c-ares overloads this recv_from virtual socket function to receive
194 * data on both UDP and TCP sockets, and from is nullptr for TCP. */
195 if (from != nullptr) {
196 GPR_ASSERT(*from_len <= recv_from_source_addr_len_);
197 memcpy(from, &recv_from_source_addr_, recv_from_source_addr_len_);
198 *from_len = recv_from_source_addr_len_;
203 grpc_slice FlattenIovec(const struct iovec* iov, int iov_count) {
205 for (int i = 0; i < iov_count; i++) {
206 total += iov[i].iov_len;
208 grpc_slice out = GRPC_SLICE_MALLOC(total);
210 for (int i = 0; i < iov_count; i++) {
211 for (int k = 0; k < iov[i].iov_len; k++) {
212 GRPC_SLICE_START_PTR(out)[cur++] = ((char*)iov[i].iov_base)[k];
218 int SendWriteBuf(LPDWORD bytes_sent_ptr, LPWSAOVERLAPPED overlapped) {
220 buf.len = GRPC_SLICE_LENGTH(write_buf_);
221 buf.buf = (char*)GRPC_SLICE_START_PTR(write_buf_);
223 int out = WSASend(grpc_winsocket_wrapped_socket(winsocket_), &buf, 1,
224 bytes_sent_ptr, flags, overlapped, nullptr);
225 GRPC_CARES_TRACE_LOG(
226 "WSASend: name:%s. buf len:%d. bytes sent: %d. overlapped %p. return "
228 GetName(), buf.len, *bytes_sent_ptr, overlapped, out);
232 ares_ssize_t TrySendWriteBufSyncNonBlocking() {
233 GPR_ASSERT(write_state_ == WRITE_IDLE);
234 DWORD bytes_sent = 0;
235 if (SendWriteBuf(&bytes_sent, nullptr) != 0) {
236 int wsa_last_error = WSAGetLastError();
237 char* msg = gpr_format_message(wsa_last_error);
238 GRPC_CARES_TRACE_LOG(
239 "TrySendWriteBufSyncNonBlocking: SendWriteBuf error:|%s|. fd:|%s|",
242 if (wsa_last_error == WSA_IO_PENDING) {
243 WSASetLastError(WSAEWOULDBLOCK);
244 write_state_ = WRITE_REQUESTED;
247 write_buf_ = grpc_slice_sub_no_ref(write_buf_, bytes_sent,
248 GRPC_SLICE_LENGTH(write_buf_));
252 ares_ssize_t SendV(const struct iovec* iov, int iov_count) {
253 GRPC_CARES_TRACE_LOG("SendV called on fd:|%s|. Current write state: %d",
254 GetName(), write_state_);
255 switch (write_state_) {
257 GPR_ASSERT(GRPC_SLICE_LENGTH(write_buf_) == 0);
258 grpc_slice_unref_internal(write_buf_);
259 write_buf_ = FlattenIovec(iov, iov_count);
260 return TrySendWriteBufSyncNonBlocking();
261 case WRITE_REQUESTED:
263 WSASetLastError(WSAEWOULDBLOCK);
265 case WRITE_WAITING_FOR_VERIFICATION_UPON_RETRY:
266 grpc_slice currently_attempted = FlattenIovec(iov, iov_count);
267 GPR_ASSERT(GRPC_SLICE_LENGTH(currently_attempted) >=
268 GRPC_SLICE_LENGTH(write_buf_));
269 ares_ssize_t total_sent = 0;
270 for (size_t i = 0; i < GRPC_SLICE_LENGTH(write_buf_); i++) {
271 GPR_ASSERT(GRPC_SLICE_START_PTR(currently_attempted)[i] ==
272 GRPC_SLICE_START_PTR(write_buf_)[i]);
275 grpc_slice_unref_internal(write_buf_);
277 grpc_slice_sub_no_ref(currently_attempted, total_sent,
278 GRPC_SLICE_LENGTH(currently_attempted));
279 write_state_ = WRITE_IDLE;
280 total_sent += TrySendWriteBufSyncNonBlocking();
286 int Connect(const struct sockaddr* target, ares_socklen_t target_len) {
287 SOCKET s = grpc_winsocket_wrapped_socket(winsocket_);
288 GRPC_CARES_TRACE_LOG("Connect: fd:|%s|", GetName());
290 WSAConnect(s, target, target_len, nullptr, nullptr, nullptr, nullptr);
292 int wsa_last_error = WSAGetLastError();
293 char* msg = gpr_format_message(wsa_last_error);
294 GRPC_CARES_TRACE_LOG("Connect error code:|%d|, msg:|%s|. fd:|%s|",
295 wsa_last_error, msg, GetName());
297 // c-ares expects a posix-style connect API
303 static void OnIocpReadable(void* arg, grpc_error* error) {
304 GrpcPolledFdWindows* polled_fd = static_cast<GrpcPolledFdWindows*>(arg);
305 polled_fd->OnIocpReadableInner(error);
308 // TODO(apolcyn): improve this error handling to be less conversative.
309 // An e.g. ECONNRESET error here should result in errors when
310 // c-ares reads from this socket later, but it shouldn't necessarily cancel
311 // the entire resolution attempt. Doing so will allow the "inject broken
312 // nameserver list" test to pass on Windows.
313 void OnIocpReadableInner(grpc_error* error) {
314 if (error == GRPC_ERROR_NONE) {
315 if (winsocket_->read_info.wsa_error != 0) {
316 /* WSAEMSGSIZE would be due to receiving more data
317 * than our read buffer's fixed capacity. Assume that
318 * the connection is TCP and read the leftovers
319 * in subsequent c-ares reads. */
320 if (winsocket_->read_info.wsa_error != WSAEMSGSIZE) {
321 GRPC_ERROR_UNREF(error);
322 char* msg = gpr_format_message(winsocket_->read_info.wsa_error);
323 GRPC_CARES_TRACE_LOG(
324 "OnIocpReadableInner. winsocket error:|%s|. fd:|%s|", msg,
326 error = GRPC_ERROR_CREATE_FROM_COPIED_STRING(msg);
331 if (error == GRPC_ERROR_NONE) {
332 read_buf_ = grpc_slice_sub_no_ref(read_buf_, 0,
333 winsocket_->read_info.bytes_transfered);
334 read_buf_has_data_ = true;
336 grpc_slice_unref_internal(read_buf_);
337 read_buf_ = grpc_empty_slice();
339 GRPC_CARES_TRACE_LOG(
340 "OnIocpReadable finishing. read buf length now:|%d|. :fd:|%s|",
341 GRPC_SLICE_LENGTH(read_buf_), GetName());
342 ScheduleAndNullReadClosure(error);
345 static void OnIocpWriteable(void* arg, grpc_error* error) {
346 GrpcPolledFdWindows* polled_fd = static_cast<GrpcPolledFdWindows*>(arg);
347 polled_fd->OnIocpWriteableInner(error);
350 void OnIocpWriteableInner(grpc_error* error) {
351 GRPC_CARES_TRACE_LOG("OnIocpWriteableInner. fd:|%s|", GetName());
352 if (error == GRPC_ERROR_NONE) {
353 if (winsocket_->write_info.wsa_error != 0) {
354 char* msg = gpr_format_message(winsocket_->write_info.wsa_error);
355 GRPC_CARES_TRACE_LOG(
356 "OnIocpWriteableInner. winsocket error:|%s|. fd:|%s|", msg,
358 GRPC_ERROR_UNREF(error);
359 error = GRPC_ERROR_CREATE_FROM_COPIED_STRING(msg);
363 GPR_ASSERT(write_state_ == WRITE_PENDING);
364 if (error == GRPC_ERROR_NONE) {
365 write_state_ = WRITE_WAITING_FOR_VERIFICATION_UPON_RETRY;
366 write_buf_ = grpc_slice_sub_no_ref(
367 write_buf_, 0, winsocket_->write_info.bytes_transfered);
369 grpc_slice_unref_internal(write_buf_);
370 write_buf_ = grpc_empty_slice();
372 ScheduleAndNullWriteClosure(error);
375 bool gotten_into_driver_list() const { return gotten_into_driver_list_; }
376 void set_gotten_into_driver_list() { gotten_into_driver_list_ = true; }
378 grpc_combiner* combiner_;
379 char recv_from_source_addr_[200];
380 ares_socklen_t recv_from_source_addr_len_;
381 grpc_slice read_buf_;
382 bool read_buf_has_data_ = false;
383 grpc_slice write_buf_;
384 grpc_closure* read_closure_ = nullptr;
385 grpc_closure* write_closure_ = nullptr;
386 grpc_closure outer_read_closure_;
387 grpc_closure outer_write_closure_;
388 grpc_winsocket* winsocket_;
389 WriteState write_state_;
390 char* name_ = nullptr;
391 bool gotten_into_driver_list_;
394 struct SockToPolledFdEntry {
395 SockToPolledFdEntry(SOCKET s, GrpcPolledFdWindows* fd)
396 : socket(s), polled_fd(fd) {}
398 GrpcPolledFdWindows* polled_fd;
399 SockToPolledFdEntry* next = nullptr;
402 /* A SockToPolledFdMap can make ares_socket_t types (SOCKET's on windows)
403 * to GrpcPolledFdWindow's, and is used to find the appropriate
404 * GrpcPolledFdWindows to handle a virtual socket call when c-ares makes that
405 * socket call on the ares_socket_t type. Instances are owned by and one-to-one
406 * with a GrpcPolledFdWindows factory and event driver */
407 class SockToPolledFdMap {
409 SockToPolledFdMap(grpc_combiner* combiner) {
410 combiner_ = GRPC_COMBINER_REF(combiner, "sock to polled fd map");
413 ~SockToPolledFdMap() {
414 GPR_ASSERT(head_ == nullptr);
415 GRPC_COMBINER_UNREF(combiner_, "sock to polled fd map");
418 void AddNewSocket(SOCKET s, GrpcPolledFdWindows* polled_fd) {
419 SockToPolledFdEntry* new_node = New<SockToPolledFdEntry>(s, polled_fd);
420 new_node->next = head_;
424 GrpcPolledFdWindows* LookupPolledFd(SOCKET s) {
425 for (SockToPolledFdEntry* node = head_; node != nullptr;
427 if (node->socket == s) {
428 GPR_ASSERT(node->polled_fd != nullptr);
429 return node->polled_fd;
435 void RemoveEntry(SOCKET s) {
436 GPR_ASSERT(head_ != nullptr);
437 SockToPolledFdEntry** prev = &head_;
438 for (SockToPolledFdEntry* node = head_; node != nullptr;
440 if (node->socket == s) {
450 /* These virtual socket functions are called from within the c-ares
451 * library. These methods generally dispatch those socket calls to the
452 * appropriate methods. The virtual "socket" and "close" methods are
453 * special and instead create/add and remove/destroy GrpcPolledFdWindows
456 static ares_socket_t Socket(int af, int type, int protocol, void* user_data) {
457 SockToPolledFdMap* map = static_cast<SockToPolledFdMap*>(user_data);
458 SOCKET s = WSASocket(af, type, protocol, nullptr, 0,
459 grpc_get_default_wsa_socket_flags());
460 if (s == INVALID_SOCKET) {
463 grpc_tcp_set_non_block(s);
464 GrpcPolledFdWindows* polled_fd =
465 New<GrpcPolledFdWindows>(s, map->combiner_);
466 map->AddNewSocket(s, polled_fd);
470 static int Connect(ares_socket_t as, const struct sockaddr* target,
471 ares_socklen_t target_len, void* user_data) {
472 SockToPolledFdMap* map = static_cast<SockToPolledFdMap*>(user_data);
473 GrpcPolledFdWindows* polled_fd = map->LookupPolledFd(as);
474 return polled_fd->Connect(target, target_len);
477 static ares_ssize_t SendV(ares_socket_t as, const struct iovec* iov,
478 int iovec_count, void* user_data) {
479 SockToPolledFdMap* map = static_cast<SockToPolledFdMap*>(user_data);
480 GrpcPolledFdWindows* polled_fd = map->LookupPolledFd(as);
481 return polled_fd->SendV(iov, iovec_count);
484 static ares_ssize_t RecvFrom(ares_socket_t as, void* data, size_t data_len,
485 int flags, struct sockaddr* from,
486 ares_socklen_t* from_len, void* user_data) {
487 SockToPolledFdMap* map = static_cast<SockToPolledFdMap*>(user_data);
488 GrpcPolledFdWindows* polled_fd = map->LookupPolledFd(as);
489 return polled_fd->RecvFrom(data, data_len, flags, from, from_len);
492 static int CloseSocket(SOCKET s, void* user_data) {
493 SockToPolledFdMap* map = static_cast<SockToPolledFdMap*>(user_data);
494 GrpcPolledFdWindows* polled_fd = map->LookupPolledFd(s);
496 // If a gRPC polled fd has not made it in to the driver's list yet, then
497 // the driver has not and will never see this socket.
498 if (!polled_fd->gotten_into_driver_list()) {
499 polled_fd->ShutdownLocked(GRPC_ERROR_CREATE_FROM_STATIC_STRING(
500 "Shut down c-ares fd before without it ever having made it into the "
508 SockToPolledFdEntry* head_ = nullptr;
509 grpc_combiner* combiner_;
512 const struct ares_socket_functions custom_ares_sock_funcs = {
513 &SockToPolledFdMap::Socket /* socket */,
514 &SockToPolledFdMap::CloseSocket /* close */,
515 &SockToPolledFdMap::Connect /* connect */,
516 &SockToPolledFdMap::RecvFrom /* recvfrom */,
517 &SockToPolledFdMap::SendV /* sendv */,
520 class GrpcPolledFdFactoryWindows : public GrpcPolledFdFactory {
522 GrpcPolledFdFactoryWindows(grpc_combiner* combiner)
523 : sock_to_polled_fd_map_(combiner) {}
525 GrpcPolledFd* NewGrpcPolledFdLocked(ares_socket_t as,
526 grpc_pollset_set* driver_pollset_set,
527 grpc_combiner* combiner) override {
528 GrpcPolledFdWindows* polled_fd = sock_to_polled_fd_map_.LookupPolledFd(as);
529 // Set a flag so that the virtual socket "close" method knows it
530 // doesn't need to call ShutdownLocked, since now the driver will.
531 polled_fd->set_gotten_into_driver_list();
535 void ConfigureAresChannelLocked(ares_channel channel) override {
536 ares_set_socket_functions(channel, &custom_ares_sock_funcs,
537 &sock_to_polled_fd_map_);
541 SockToPolledFdMap sock_to_polled_fd_map_;
544 UniquePtr<GrpcPolledFdFactory> NewGrpcPolledFdFactory(grpc_combiner* combiner) {
545 return UniquePtr<GrpcPolledFdFactory>(
546 New<GrpcPolledFdFactoryWindows>(combiner));
549 } // namespace grpc_core
551 #endif /* GRPC_ARES == 1 && defined(GPR_WINDOWS) */