Built motion from commit 6a09e18b.|2.6.11
[motion2.git] / legacy-libs / grpc / deps / grpc / src / core / ext / filters / client_channel / resolver / dns / c_ares / grpc_ares_ev_driver_windows.cc
1 /*
2  *
3  * Copyright 2016 gRPC authors.
4  *
5  * Licensed under the Apache License, Version 2.0 (the "License");
6  * you may not use this file except in compliance with the License.
7  * You may obtain a copy of the License at
8  *
9  *     http://www.apache.org/licenses/LICENSE-2.0
10  *
11  * Unless required by applicable law or agreed to in writing, software
12  * distributed under the License is distributed on an "AS IS" BASIS,
13  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14  * See the License for the specific language governing permissions and
15  * limitations under the License.
16  *
17  */
18 #include <grpc/support/port_platform.h>
19
20 #include "src/core/lib/iomgr/port.h"
21 #if GRPC_ARES == 1 && defined(GRPC_WINDOWS_SOCKET_ARES_EV_DRIVER)
22
23 #include <ares.h>
24
25 #include <grpc/support/alloc.h>
26 #include <grpc/support/log.h>
27 #include <grpc/support/log_windows.h>
28 #include <grpc/support/string_util.h>
29 #include <grpc/support/time.h>
30 #include <string.h>
31 #include "src/core/lib/gpr/string.h"
32 #include "src/core/lib/gprpp/memory.h"
33 #include "src/core/lib/iomgr/combiner.h"
34 #include "src/core/lib/iomgr/iocp_windows.h"
35 #include "src/core/lib/iomgr/sockaddr_utils.h"
36 #include "src/core/lib/iomgr/sockaddr_windows.h"
37 #include "src/core/lib/iomgr/socket_windows.h"
38 #include "src/core/lib/iomgr/tcp_windows.h"
39 #include "src/core/lib/slice/slice_internal.h"
40
41 #include "src/core/ext/filters/client_channel/resolver/dns/c_ares/grpc_ares_ev_driver.h"
42 #include "src/core/ext/filters/client_channel/resolver/dns/c_ares/grpc_ares_wrapper.h"
43
44 /* TODO(apolcyn): remove this hack after fixing upstream.
45  * Our grpc/c-ares code on Windows uses the ares_set_socket_functions API,
46  * which uses "struct iovec" type, which on Windows is defined inside of
47  * a c-ares header that is not public.
48  * See https://github.com/c-ares/c-ares/issues/206. */
49 struct iovec {
50   void* iov_base;
51   size_t iov_len;
52 };
53
54 namespace grpc_core {
55
56 /* c-ares reads and takes action on the error codes of the
57  * "virtual socket operations" in this file, via the WSAGetLastError
58  * APIs. If code in this file wants to set a specific WSA error that
59  * c-ares should read, it must do so by calling SetWSAError() on the
60  * WSAErrorContext instance passed to it. A WSAErrorContext must only be
61  * instantiated at the top of the virtual socket function callstack. */
62 class WSAErrorContext {
63  public:
64   explicit WSAErrorContext(){};
65
66   ~WSAErrorContext() {
67     if (error_ != 0) {
68       WSASetLastError(error_);
69     }
70   }
71
72   /* Disallow copy and assignment operators */
73   WSAErrorContext(const WSAErrorContext&) = delete;
74   WSAErrorContext& operator=(const WSAErrorContext&) = delete;
75
76   void SetWSAError(int error) { error_ = error; }
77
78  private:
79   int error_ = 0;
80 };
81
82 /* c-ares creates its own sockets and is meant to read them when readable and
83  * write them when writeable. To fit this socket usage model into the grpc
84  * windows poller (which gives notifications when attempted reads and writes are
85  * actually fulfilled rather than possible), this GrpcPolledFdWindows class
86  * takes advantage of the ares_set_socket_functions API and acts as a virtual
87  * socket. It holds its own read and write buffers which are written to and read
88  * from c-ares and are used with the grpc windows poller, and it, e.g.,
89  * manufactures virtual socket error codes when it e.g. needs to tell the c-ares
90  * library to wait for an async read. */
91 class GrpcPolledFdWindows : public GrpcPolledFd {
92  public:
93   enum WriteState {
94     WRITE_IDLE,
95     WRITE_REQUESTED,
96     WRITE_PENDING,
97     WRITE_WAITING_FOR_VERIFICATION_UPON_RETRY,
98   };
99
100   GrpcPolledFdWindows(ares_socket_t as, grpc_combiner* combiner,
101                       int address_family, int socket_type)
102       : read_buf_(grpc_empty_slice()),
103         write_buf_(grpc_empty_slice()),
104         tcp_write_state_(WRITE_IDLE),
105         gotten_into_driver_list_(false),
106         address_family_(address_family),
107         socket_type_(socket_type) {
108     gpr_asprintf(&name_, "c-ares socket: %" PRIdPTR, as);
109     winsocket_ = grpc_winsocket_create(as, name_);
110     combiner_ = GRPC_COMBINER_REF(combiner, name_);
111     GRPC_CLOSURE_INIT(&outer_read_closure_,
112                       &GrpcPolledFdWindows::OnIocpReadable, this,
113                       grpc_combiner_scheduler(combiner_));
114     GRPC_CLOSURE_INIT(&outer_write_closure_,
115                       &GrpcPolledFdWindows::OnIocpWriteable, this,
116                       grpc_combiner_scheduler(combiner_));
117     GRPC_CLOSURE_INIT(&on_tcp_connect_locked_,
118                       &GrpcPolledFdWindows::OnTcpConnectLocked, this,
119                       grpc_combiner_scheduler(combiner_));
120     GRPC_CLOSURE_INIT(&continue_register_for_on_readable_locked_,
121                       &GrpcPolledFdWindows::ContinueRegisterForOnReadableLocked,
122                       this, grpc_combiner_scheduler(combiner_));
123     GRPC_CLOSURE_INIT(
124         &continue_register_for_on_writeable_locked_,
125         &GrpcPolledFdWindows::ContinueRegisterForOnWriteableLocked, this,
126         grpc_combiner_scheduler(combiner_));
127   }
128
129   ~GrpcPolledFdWindows() {
130     GRPC_COMBINER_UNREF(combiner_, name_);
131     grpc_slice_unref_internal(read_buf_);
132     grpc_slice_unref_internal(write_buf_);
133     GPR_ASSERT(read_closure_ == nullptr);
134     GPR_ASSERT(write_closure_ == nullptr);
135     grpc_winsocket_destroy(winsocket_);
136     gpr_free(name_);
137   }
138
139   void ScheduleAndNullReadClosure(grpc_error* error) {
140     GRPC_CLOSURE_SCHED(read_closure_, error);
141     read_closure_ = nullptr;
142   }
143
144   void ScheduleAndNullWriteClosure(grpc_error* error) {
145     GRPC_CLOSURE_SCHED(write_closure_, error);
146     write_closure_ = nullptr;
147   }
148
149   void RegisterForOnReadableLocked(grpc_closure* read_closure) override {
150     GPR_ASSERT(read_closure_ == nullptr);
151     read_closure_ = read_closure;
152     GPR_ASSERT(GRPC_SLICE_LENGTH(read_buf_) == 0);
153     grpc_slice_unref_internal(read_buf_);
154     GPR_ASSERT(!read_buf_has_data_);
155     read_buf_ = GRPC_SLICE_MALLOC(4192);
156     if (connect_done_) {
157       GRPC_CLOSURE_SCHED(&continue_register_for_on_readable_locked_,
158                          GRPC_ERROR_NONE);
159     } else {
160       GPR_ASSERT(pending_continue_register_for_on_readable_locked_ == nullptr);
161       pending_continue_register_for_on_readable_locked_ =
162           &continue_register_for_on_readable_locked_;
163     }
164   }
165
166   static void ContinueRegisterForOnReadableLocked(void* arg,
167                                                   grpc_error* unused_error) {
168     GrpcPolledFdWindows* grpc_polled_fd =
169         static_cast<GrpcPolledFdWindows*>(arg);
170     grpc_polled_fd->InnerContinueRegisterForOnReadableLocked(GRPC_ERROR_NONE);
171   }
172
173   void InnerContinueRegisterForOnReadableLocked(grpc_error* unused_error) {
174     GRPC_CARES_TRACE_LOG(
175         "fd:|%s| InnerContinueRegisterForOnReadableLocked "
176         "wsa_connect_error_:%d",
177         GetName(), wsa_connect_error_);
178     GPR_ASSERT(connect_done_);
179     if (wsa_connect_error_ != 0) {
180       ScheduleAndNullReadClosure(GRPC_WSA_ERROR(wsa_connect_error_, "connect"));
181       return;
182     }
183     WSABUF buffer;
184     buffer.buf = (char*)GRPC_SLICE_START_PTR(read_buf_);
185     buffer.len = GRPC_SLICE_LENGTH(read_buf_);
186     memset(&winsocket_->read_info.overlapped, 0, sizeof(OVERLAPPED));
187     recv_from_source_addr_len_ = sizeof(recv_from_source_addr_);
188     DWORD flags = 0;
189     if (WSARecvFrom(grpc_winsocket_wrapped_socket(winsocket_), &buffer, 1,
190                     nullptr, &flags, (sockaddr*)recv_from_source_addr_,
191                     &recv_from_source_addr_len_,
192                     &winsocket_->read_info.overlapped, nullptr)) {
193       int wsa_last_error = WSAGetLastError();
194       char* msg = gpr_format_message(wsa_last_error);
195       GRPC_CARES_TRACE_LOG(
196           "fd:|%s| RegisterForOnReadableLocked WSARecvFrom error code:|%d| "
197           "msg:|%s|",
198           GetName(), wsa_last_error, msg);
199       gpr_free(msg);
200       if (wsa_last_error != WSA_IO_PENDING) {
201         ScheduleAndNullReadClosure(
202             GRPC_WSA_ERROR(wsa_last_error, "WSARecvFrom"));
203         return;
204       }
205     }
206     grpc_socket_notify_on_read(winsocket_, &outer_read_closure_);
207   }
208
209   void RegisterForOnWriteableLocked(grpc_closure* write_closure) override {
210     if (socket_type_ == SOCK_DGRAM) {
211       GRPC_CARES_TRACE_LOG("fd:|%s| RegisterForOnWriteableLocked called",
212                            GetName());
213     } else {
214       GPR_ASSERT(socket_type_ == SOCK_STREAM);
215       GRPC_CARES_TRACE_LOG(
216           "fd:|%s| RegisterForOnWriteableLocked called tcp_write_state_: %d",
217           GetName(), tcp_write_state_);
218     }
219     GPR_ASSERT(write_closure_ == nullptr);
220     write_closure_ = write_closure;
221     if (connect_done_) {
222       GRPC_CLOSURE_SCHED(&continue_register_for_on_writeable_locked_,
223                          GRPC_ERROR_NONE);
224     } else {
225       GPR_ASSERT(pending_continue_register_for_on_writeable_locked_ == nullptr);
226       pending_continue_register_for_on_writeable_locked_ =
227           &continue_register_for_on_writeable_locked_;
228     }
229   }
230
231   static void ContinueRegisterForOnWriteableLocked(void* arg,
232                                                    grpc_error* unused_error) {
233     GrpcPolledFdWindows* grpc_polled_fd =
234         static_cast<GrpcPolledFdWindows*>(arg);
235     grpc_polled_fd->InnerContinueRegisterForOnWriteableLocked(GRPC_ERROR_NONE);
236   }
237
238   void InnerContinueRegisterForOnWriteableLocked(grpc_error* unused_error) {
239     GRPC_CARES_TRACE_LOG(
240         "fd:|%s| InnerContinueRegisterForOnWriteableLocked "
241         "wsa_connect_error_:%d",
242         GetName(), wsa_connect_error_);
243     GPR_ASSERT(connect_done_);
244     if (wsa_connect_error_ != 0) {
245       ScheduleAndNullWriteClosure(
246           GRPC_WSA_ERROR(wsa_connect_error_, "connect"));
247       return;
248     }
249     if (socket_type_ == SOCK_DGRAM) {
250       ScheduleAndNullWriteClosure(GRPC_ERROR_NONE);
251     } else {
252       GPR_ASSERT(socket_type_ == SOCK_STREAM);
253       int wsa_error_code = 0;
254       switch (tcp_write_state_) {
255         case WRITE_IDLE:
256           ScheduleAndNullWriteClosure(GRPC_ERROR_NONE);
257           break;
258         case WRITE_REQUESTED:
259           tcp_write_state_ = WRITE_PENDING;
260           if (SendWriteBuf(nullptr, &winsocket_->write_info.overlapped,
261                            &wsa_error_code) != 0) {
262             ScheduleAndNullWriteClosure(
263                 GRPC_WSA_ERROR(wsa_error_code, "WSASend (overlapped)"));
264           } else {
265             grpc_socket_notify_on_write(winsocket_, &outer_write_closure_);
266           }
267           break;
268         case WRITE_PENDING:
269         case WRITE_WAITING_FOR_VERIFICATION_UPON_RETRY:
270           abort();
271       }
272     }
273   }
274
275   bool IsFdStillReadableLocked() override {
276     return GRPC_SLICE_LENGTH(read_buf_) > 0;
277   }
278
279   void ShutdownLocked(grpc_error* error) override {
280     grpc_winsocket_shutdown(winsocket_);
281   }
282
283   ares_socket_t GetWrappedAresSocketLocked() override {
284     return grpc_winsocket_wrapped_socket(winsocket_);
285   }
286
287   const char* GetName() override { return name_; }
288
289   ares_ssize_t RecvFrom(WSAErrorContext* wsa_error_ctx, void* data,
290                         ares_socket_t data_len, int flags,
291                         struct sockaddr* from, ares_socklen_t* from_len) {
292     GRPC_CARES_TRACE_LOG(
293         "fd:|%s| RecvFrom called read_buf_has_data:%d Current read buf "
294         "length:|%d|",
295         GetName(), read_buf_has_data_, GRPC_SLICE_LENGTH(read_buf_));
296     if (!read_buf_has_data_) {
297       wsa_error_ctx->SetWSAError(WSAEWOULDBLOCK);
298       return -1;
299     }
300     ares_ssize_t bytes_read = 0;
301     for (size_t i = 0; i < GRPC_SLICE_LENGTH(read_buf_) && i < data_len; i++) {
302       ((char*)data)[i] = GRPC_SLICE_START_PTR(read_buf_)[i];
303       bytes_read++;
304     }
305     read_buf_ = grpc_slice_sub_no_ref(read_buf_, bytes_read,
306                                       GRPC_SLICE_LENGTH(read_buf_));
307     if (GRPC_SLICE_LENGTH(read_buf_) == 0) {
308       read_buf_has_data_ = false;
309     }
310     /* c-ares overloads this recv_from virtual socket function to receive
311      * data on both UDP and TCP sockets, and from is nullptr for TCP. */
312     if (from != nullptr) {
313       GPR_ASSERT(*from_len <= recv_from_source_addr_len_);
314       memcpy(from, &recv_from_source_addr_, recv_from_source_addr_len_);
315       *from_len = recv_from_source_addr_len_;
316     }
317     return bytes_read;
318   }
319
320   grpc_slice FlattenIovec(const struct iovec* iov, int iov_count) {
321     int total = 0;
322     for (int i = 0; i < iov_count; i++) {
323       total += iov[i].iov_len;
324     }
325     grpc_slice out = GRPC_SLICE_MALLOC(total);
326     size_t cur = 0;
327     for (int i = 0; i < iov_count; i++) {
328       for (int k = 0; k < iov[i].iov_len; k++) {
329         GRPC_SLICE_START_PTR(out)[cur++] = ((char*)iov[i].iov_base)[k];
330       }
331     }
332     return out;
333   }
334
335   int SendWriteBuf(LPDWORD bytes_sent_ptr, LPWSAOVERLAPPED overlapped,
336                    int* wsa_error_code) {
337     WSABUF buf;
338     buf.len = GRPC_SLICE_LENGTH(write_buf_);
339     buf.buf = (char*)GRPC_SLICE_START_PTR(write_buf_);
340     DWORD flags = 0;
341     int out = WSASend(grpc_winsocket_wrapped_socket(winsocket_), &buf, 1,
342                       bytes_sent_ptr, flags, overlapped, nullptr);
343     *wsa_error_code = WSAGetLastError();
344     GRPC_CARES_TRACE_LOG(
345         "fd:|%s| SendWriteBuf WSASend buf.len:%d *bytes_sent_ptr:%d "
346         "overlapped:%p "
347         "return:%d *wsa_error_code:%d",
348         GetName(), buf.len, bytes_sent_ptr != nullptr ? *bytes_sent_ptr : 0,
349         overlapped, out, *wsa_error_code);
350     return out;
351   }
352
353   ares_ssize_t SendV(WSAErrorContext* wsa_error_ctx, const struct iovec* iov,
354                      int iov_count) {
355     GRPC_CARES_TRACE_LOG(
356         "fd:|%s| SendV called connect_done_:%d wsa_connect_error_:%d",
357         GetName(), connect_done_, wsa_connect_error_);
358     if (!connect_done_) {
359       wsa_error_ctx->SetWSAError(WSAEWOULDBLOCK);
360       return -1;
361     }
362     if (wsa_connect_error_ != 0) {
363       wsa_error_ctx->SetWSAError(wsa_connect_error_);
364       return -1;
365     }
366     switch (socket_type_) {
367       case SOCK_DGRAM:
368         return SendVUDP(wsa_error_ctx, iov, iov_count);
369       case SOCK_STREAM:
370         return SendVTCP(wsa_error_ctx, iov, iov_count);
371       default:
372         abort();
373     }
374   }
375
376   ares_ssize_t SendVUDP(WSAErrorContext* wsa_error_ctx, const struct iovec* iov,
377                         int iov_count) {
378     // c-ares doesn't handle retryable errors on writes of UDP sockets.
379     // Therefore, the sendv handler for UDP sockets must only attempt
380     // to write everything inline.
381     GRPC_CARES_TRACE_LOG("fd:|%s| SendVUDP called", GetName());
382     GPR_ASSERT(GRPC_SLICE_LENGTH(write_buf_) == 0);
383     grpc_slice_unref_internal(write_buf_);
384     write_buf_ = FlattenIovec(iov, iov_count);
385     DWORD bytes_sent = 0;
386     int wsa_error_code = 0;
387     if (SendWriteBuf(&bytes_sent, nullptr, &wsa_error_code) != 0) {
388       wsa_error_ctx->SetWSAError(wsa_error_code);
389       char* msg = gpr_format_message(wsa_error_code);
390       GRPC_CARES_TRACE_LOG(
391           "fd:|%s| SendVUDP SendWriteBuf error code:%d msg:|%s|", GetName(),
392           wsa_error_code, msg);
393       gpr_free(msg);
394       return -1;
395     }
396     write_buf_ = grpc_slice_sub_no_ref(write_buf_, bytes_sent,
397                                        GRPC_SLICE_LENGTH(write_buf_));
398     return bytes_sent;
399   }
400
401   ares_ssize_t SendVTCP(WSAErrorContext* wsa_error_ctx, const struct iovec* iov,
402                         int iov_count) {
403     // The "sendv" handler on TCP sockets buffers up write
404     // requests and returns an artifical WSAEWOULDBLOCK. Writing that buffer out
405     // in the background, and making further send progress in general, will
406     // happen as long as c-ares continues to show interest in writeability on
407     // this fd.
408     GRPC_CARES_TRACE_LOG("fd:|%s| SendVTCP called tcp_write_state_:%d",
409                          GetName(), tcp_write_state_);
410     switch (tcp_write_state_) {
411       case WRITE_IDLE:
412         tcp_write_state_ = WRITE_REQUESTED;
413         GPR_ASSERT(GRPC_SLICE_LENGTH(write_buf_) == 0);
414         grpc_slice_unref_internal(write_buf_);
415         write_buf_ = FlattenIovec(iov, iov_count);
416         wsa_error_ctx->SetWSAError(WSAEWOULDBLOCK);
417         return -1;
418       case WRITE_REQUESTED:
419       case WRITE_PENDING:
420         wsa_error_ctx->SetWSAError(WSAEWOULDBLOCK);
421         return -1;
422       case WRITE_WAITING_FOR_VERIFICATION_UPON_RETRY:
423         // c-ares is retrying a send on data that we previously returned
424         // WSAEWOULDBLOCK for, but then subsequently wrote out in the
425         // background. Right now, we assume that c-ares is retrying the same
426         // send again. If c-ares still needs to send even more data, we'll get
427         // to it eventually.
428         grpc_slice currently_attempted = FlattenIovec(iov, iov_count);
429         GPR_ASSERT(GRPC_SLICE_LENGTH(currently_attempted) >=
430                    GRPC_SLICE_LENGTH(write_buf_));
431         ares_ssize_t total_sent = 0;
432         for (size_t i = 0; i < GRPC_SLICE_LENGTH(write_buf_); i++) {
433           GPR_ASSERT(GRPC_SLICE_START_PTR(currently_attempted)[i] ==
434                      GRPC_SLICE_START_PTR(write_buf_)[i]);
435           total_sent++;
436         }
437         grpc_slice_unref_internal(currently_attempted);
438         tcp_write_state_ = WRITE_IDLE;
439         return total_sent;
440     }
441     abort();
442   }
443
444   static void OnTcpConnectLocked(void* arg, grpc_error* error) {
445     GrpcPolledFdWindows* grpc_polled_fd =
446         static_cast<GrpcPolledFdWindows*>(arg);
447     grpc_polled_fd->InnerOnTcpConnectLocked(error);
448   }
449
450   void InnerOnTcpConnectLocked(grpc_error* error) {
451     GRPC_CARES_TRACE_LOG(
452         "fd:%s InnerOnTcpConnectLocked error:|%s| "
453         "pending_register_for_readable:%" PRIdPTR
454         " pending_register_for_writeable:%" PRIdPTR,
455         GetName(), grpc_error_string(error),
456         pending_continue_register_for_on_readable_locked_,
457         pending_continue_register_for_on_writeable_locked_);
458     GPR_ASSERT(!connect_done_);
459     connect_done_ = true;
460     GPR_ASSERT(wsa_connect_error_ == 0);
461     if (error == GRPC_ERROR_NONE) {
462       DWORD transferred_bytes = 0;
463       DWORD flags;
464       BOOL wsa_success =
465           WSAGetOverlappedResult(grpc_winsocket_wrapped_socket(winsocket_),
466                                  &winsocket_->write_info.overlapped,
467                                  &transferred_bytes, FALSE, &flags);
468       GPR_ASSERT(transferred_bytes == 0);
469       if (!wsa_success) {
470         wsa_connect_error_ = WSAGetLastError();
471         char* msg = gpr_format_message(wsa_connect_error_);
472         GRPC_CARES_TRACE_LOG(
473             "fd:%s InnerOnTcpConnectLocked WSA overlapped result code:%d "
474             "msg:|%s|",
475             GetName(), wsa_connect_error_, msg);
476         gpr_free(msg);
477       }
478     } else {
479       // Spoof up an error code that will cause any future c-ares operations on
480       // this fd to abort.
481       wsa_connect_error_ = WSA_OPERATION_ABORTED;
482     }
483     if (pending_continue_register_for_on_readable_locked_ != nullptr) {
484       GRPC_CLOSURE_SCHED(pending_continue_register_for_on_readable_locked_,
485                          GRPC_ERROR_NONE);
486     }
487     if (pending_continue_register_for_on_writeable_locked_ != nullptr) {
488       GRPC_CLOSURE_SCHED(pending_continue_register_for_on_writeable_locked_,
489                          GRPC_ERROR_NONE);
490     }
491   }
492
493   int Connect(WSAErrorContext* wsa_error_ctx, const struct sockaddr* target,
494               ares_socklen_t target_len) {
495     switch (socket_type_) {
496       case SOCK_DGRAM:
497         return ConnectUDP(wsa_error_ctx, target, target_len);
498       case SOCK_STREAM:
499         return ConnectTCP(wsa_error_ctx, target, target_len);
500       default:
501         abort();
502     }
503   }
504
505   int ConnectUDP(WSAErrorContext* wsa_error_ctx, const struct sockaddr* target,
506                  ares_socklen_t target_len) {
507     GRPC_CARES_TRACE_LOG("fd:%s ConnectUDP", GetName());
508     GPR_ASSERT(!connect_done_);
509     GPR_ASSERT(wsa_connect_error_ == 0);
510     SOCKET s = grpc_winsocket_wrapped_socket(winsocket_);
511     int out =
512         WSAConnect(s, target, target_len, nullptr, nullptr, nullptr, nullptr);
513     wsa_connect_error_ = WSAGetLastError();
514     wsa_error_ctx->SetWSAError(wsa_connect_error_);
515     connect_done_ = true;
516     char* msg = gpr_format_message(wsa_connect_error_);
517     GRPC_CARES_TRACE_LOG("fd:%s WSAConnect error code:|%d| msg:|%s|", GetName(),
518                          wsa_connect_error_, msg);
519     gpr_free(msg);
520     // c-ares expects a posix-style connect API
521     return out == 0 ? 0 : -1;
522   }
523
524   int ConnectTCP(WSAErrorContext* wsa_error_ctx, const struct sockaddr* target,
525                  ares_socklen_t target_len) {
526     GRPC_CARES_TRACE_LOG("fd:%s ConnectTCP", GetName());
527     LPFN_CONNECTEX ConnectEx;
528     GUID guid = WSAID_CONNECTEX;
529     DWORD ioctl_num_bytes;
530     SOCKET s = grpc_winsocket_wrapped_socket(winsocket_);
531     if (WSAIoctl(s, SIO_GET_EXTENSION_FUNCTION_POINTER, &guid, sizeof(guid),
532                  &ConnectEx, sizeof(ConnectEx), &ioctl_num_bytes, nullptr,
533                  nullptr) != 0) {
534       int wsa_last_error = WSAGetLastError();
535       wsa_error_ctx->SetWSAError(wsa_last_error);
536       char* msg = gpr_format_message(wsa_last_error);
537       GRPC_CARES_TRACE_LOG(
538           "fd:%s WSAIoctl(SIO_GET_EXTENSION_FUNCTION_POINTER) error code:%d "
539           "msg:|%s|",
540           GetName(), wsa_last_error, msg);
541       gpr_free(msg);
542       connect_done_ = true;
543       wsa_connect_error_ = wsa_last_error;
544       return -1;
545     }
546     grpc_resolved_address wildcard4_addr;
547     grpc_resolved_address wildcard6_addr;
548     grpc_sockaddr_make_wildcards(0, &wildcard4_addr, &wildcard6_addr);
549     grpc_resolved_address* local_address = nullptr;
550     if (address_family_ == AF_INET) {
551       local_address = &wildcard4_addr;
552     } else {
553       local_address = &wildcard6_addr;
554     }
555     if (bind(s, (struct sockaddr*)local_address->addr,
556              (int)local_address->len) != 0) {
557       int wsa_last_error = WSAGetLastError();
558       wsa_error_ctx->SetWSAError(wsa_last_error);
559       char* msg = gpr_format_message(wsa_last_error);
560       GRPC_CARES_TRACE_LOG("fd:%s bind error code:%d msg:|%s|", GetName(),
561                            wsa_last_error, msg);
562       gpr_free(msg);
563       connect_done_ = true;
564       wsa_connect_error_ = wsa_last_error;
565       return -1;
566     }
567     int out = 0;
568     if (ConnectEx(s, target, target_len, nullptr, 0, nullptr,
569                   &winsocket_->write_info.overlapped) == 0) {
570       out = -1;
571       int wsa_last_error = WSAGetLastError();
572       wsa_error_ctx->SetWSAError(wsa_last_error);
573       char* msg = gpr_format_message(wsa_last_error);
574       GRPC_CARES_TRACE_LOG("fd:%s ConnectEx error code:%d msg:|%s|", GetName(),
575                            wsa_last_error, msg);
576       gpr_free(msg);
577       if (wsa_last_error == WSA_IO_PENDING) {
578         // c-ares only understands WSAEINPROGRESS and EWOULDBLOCK error codes on
579         // connect, but an async connect on IOCP socket will give
580         // WSA_IO_PENDING, so we need to convert.
581         wsa_error_ctx->SetWSAError(WSAEWOULDBLOCK);
582       } else {
583         // By returning a non-retryable error to c-ares at this point,
584         // we're aborting the possibility of any future operations on this fd.
585         connect_done_ = true;
586         wsa_connect_error_ = wsa_last_error;
587         return -1;
588       }
589     }
590     grpc_socket_notify_on_write(winsocket_, &on_tcp_connect_locked_);
591     return out;
592   }
593
594   static void OnIocpReadable(void* arg, grpc_error* error) {
595     GrpcPolledFdWindows* polled_fd = static_cast<GrpcPolledFdWindows*>(arg);
596     polled_fd->OnIocpReadableInner(error);
597   }
598
599   // TODO(apolcyn): improve this error handling to be less conversative.
600   // An e.g. ECONNRESET error here should result in errors when
601   // c-ares reads from this socket later, but it shouldn't necessarily cancel
602   // the entire resolution attempt. Doing so will allow the "inject broken
603   // nameserver list" test to pass on Windows.
604   void OnIocpReadableInner(grpc_error* error) {
605     if (error == GRPC_ERROR_NONE) {
606       if (winsocket_->read_info.wsa_error != 0) {
607         /* WSAEMSGSIZE would be due to receiving more data
608          * than our read buffer's fixed capacity. Assume that
609          * the connection is TCP and read the leftovers
610          * in subsequent c-ares reads. */
611         if (winsocket_->read_info.wsa_error != WSAEMSGSIZE) {
612           GRPC_ERROR_UNREF(error);
613           error = GRPC_WSA_ERROR(winsocket_->read_info.wsa_error,
614                                  "OnIocpReadableInner");
615           GRPC_CARES_TRACE_LOG(
616               "fd:|%s| OnIocpReadableInner winsocket_->read_info.wsa_error "
617               "code:|%d| msg:|%s|",
618               GetName(), winsocket_->read_info.wsa_error,
619               grpc_error_string(error));
620         }
621       }
622     }
623     if (error == GRPC_ERROR_NONE) {
624       read_buf_ = grpc_slice_sub_no_ref(
625           read_buf_, 0, winsocket_->read_info.bytes_transferred);
626       read_buf_has_data_ = true;
627     } else {
628       grpc_slice_unref_internal(read_buf_);
629       read_buf_ = grpc_empty_slice();
630     }
631     GRPC_CARES_TRACE_LOG(
632         "fd:|%s| OnIocpReadable finishing. read buf length now:|%d|", GetName(),
633         GRPC_SLICE_LENGTH(read_buf_));
634     ScheduleAndNullReadClosure(error);
635   }
636
637   static void OnIocpWriteable(void* arg, grpc_error* error) {
638     GrpcPolledFdWindows* polled_fd = static_cast<GrpcPolledFdWindows*>(arg);
639     polled_fd->OnIocpWriteableInner(error);
640   }
641
642   void OnIocpWriteableInner(grpc_error* error) {
643     GRPC_CARES_TRACE_LOG("OnIocpWriteableInner. fd:|%s|", GetName());
644     GPR_ASSERT(socket_type_ == SOCK_STREAM);
645     if (error == GRPC_ERROR_NONE) {
646       if (winsocket_->write_info.wsa_error != 0) {
647         GRPC_ERROR_UNREF(error);
648         error = GRPC_WSA_ERROR(winsocket_->write_info.wsa_error,
649                                "OnIocpWriteableInner");
650         GRPC_CARES_TRACE_LOG(
651             "fd:|%s| OnIocpWriteableInner. winsocket_->write_info.wsa_error "
652             "code:|%d| msg:|%s|",
653             GetName(), winsocket_->write_info.wsa_error,
654             grpc_error_string(error));
655       }
656     }
657     GPR_ASSERT(tcp_write_state_ == WRITE_PENDING);
658     if (error == GRPC_ERROR_NONE) {
659       tcp_write_state_ = WRITE_WAITING_FOR_VERIFICATION_UPON_RETRY;
660       write_buf_ = grpc_slice_sub_no_ref(
661           write_buf_, 0, winsocket_->write_info.bytes_transferred);
662       GRPC_CARES_TRACE_LOG("fd:|%s| OnIocpWriteableInner. bytes transferred:%d",
663                            GetName(), winsocket_->write_info.bytes_transferred);
664     } else {
665       grpc_slice_unref_internal(write_buf_);
666       write_buf_ = grpc_empty_slice();
667     }
668     ScheduleAndNullWriteClosure(error);
669   }
670
671   bool gotten_into_driver_list() const { return gotten_into_driver_list_; }
672   void set_gotten_into_driver_list() { gotten_into_driver_list_ = true; }
673
674   grpc_combiner* combiner_;
675   char recv_from_source_addr_[200];
676   ares_socklen_t recv_from_source_addr_len_;
677   grpc_slice read_buf_;
678   bool read_buf_has_data_ = false;
679   grpc_slice write_buf_;
680   grpc_closure* read_closure_ = nullptr;
681   grpc_closure* write_closure_ = nullptr;
682   grpc_closure outer_read_closure_;
683   grpc_closure outer_write_closure_;
684   grpc_winsocket* winsocket_;
685   // tcp_write_state_ is only used on TCP GrpcPolledFds
686   WriteState tcp_write_state_;
687   char* name_ = nullptr;
688   bool gotten_into_driver_list_;
689   int address_family_;
690   int socket_type_;
691   grpc_closure on_tcp_connect_locked_;
692   bool connect_done_ = false;
693   int wsa_connect_error_ = 0;
694   // We don't run register_for_{readable,writeable} logic until
695   // a socket is connected. In the interim, we queue readable/writeable
696   // registrations with the following state.
697   grpc_closure continue_register_for_on_readable_locked_;
698   grpc_closure continue_register_for_on_writeable_locked_;
699   grpc_closure* pending_continue_register_for_on_readable_locked_ = nullptr;
700   grpc_closure* pending_continue_register_for_on_writeable_locked_ = nullptr;
701 };
702
703 struct SockToPolledFdEntry {
704   SockToPolledFdEntry(SOCKET s, GrpcPolledFdWindows* fd)
705       : socket(s), polled_fd(fd) {}
706   SOCKET socket;
707   GrpcPolledFdWindows* polled_fd;
708   SockToPolledFdEntry* next = nullptr;
709 };
710
711 /* A SockToPolledFdMap can make ares_socket_t types (SOCKET's on windows)
712  * to GrpcPolledFdWindow's, and is used to find the appropriate
713  * GrpcPolledFdWindows to handle a virtual socket call when c-ares makes that
714  * socket call on the ares_socket_t type. Instances are owned by and one-to-one
715  * with a GrpcPolledFdWindows factory and event driver */
716 class SockToPolledFdMap {
717  public:
718   SockToPolledFdMap(grpc_combiner* combiner) {
719     combiner_ = GRPC_COMBINER_REF(combiner, "sock to polled fd map");
720   }
721
722   ~SockToPolledFdMap() {
723     GPR_ASSERT(head_ == nullptr);
724     GRPC_COMBINER_UNREF(combiner_, "sock to polled fd map");
725   }
726
727   void AddNewSocket(SOCKET s, GrpcPolledFdWindows* polled_fd) {
728     SockToPolledFdEntry* new_node = New<SockToPolledFdEntry>(s, polled_fd);
729     new_node->next = head_;
730     head_ = new_node;
731   }
732
733   GrpcPolledFdWindows* LookupPolledFd(SOCKET s) {
734     for (SockToPolledFdEntry* node = head_; node != nullptr;
735          node = node->next) {
736       if (node->socket == s) {
737         GPR_ASSERT(node->polled_fd != nullptr);
738         return node->polled_fd;
739       }
740     }
741     abort();
742   }
743
744   void RemoveEntry(SOCKET s) {
745     GPR_ASSERT(head_ != nullptr);
746     SockToPolledFdEntry** prev = &head_;
747     for (SockToPolledFdEntry* node = head_; node != nullptr;
748          node = node->next) {
749       if (node->socket == s) {
750         *prev = node->next;
751         Delete(node);
752         return;
753       }
754       prev = &node->next;
755     }
756     abort();
757   }
758
759   /* These virtual socket functions are called from within the c-ares
760    * library. These methods generally dispatch those socket calls to the
761    * appropriate methods. The virtual "socket" and "close" methods are
762    * special and instead create/add and remove/destroy GrpcPolledFdWindows
763    * objects.
764    */
765   static ares_socket_t Socket(int af, int type, int protocol, void* user_data) {
766     if (type != SOCK_DGRAM && type != SOCK_STREAM) {
767       GRPC_CARES_TRACE_LOG("Socket called with invalid socket type:%d", type);
768       return INVALID_SOCKET;
769     }
770     SockToPolledFdMap* map = static_cast<SockToPolledFdMap*>(user_data);
771     SOCKET s = WSASocket(af, type, protocol, nullptr, 0,
772                          grpc_get_default_wsa_socket_flags());
773     if (s == INVALID_SOCKET) {
774       GRPC_CARES_TRACE_LOG(
775           "WSASocket failed with params af:%d type:%d protocol:%d", af, type,
776           protocol);
777       return s;
778     }
779     grpc_tcp_set_non_block(s);
780     GrpcPolledFdWindows* polled_fd =
781         New<GrpcPolledFdWindows>(s, map->combiner_, af, type);
782     GRPC_CARES_TRACE_LOG(
783         "fd:|%s| created with params af:%d type:%d protocol:%d",
784         polled_fd->GetName(), af, type, protocol);
785     map->AddNewSocket(s, polled_fd);
786     return s;
787   }
788
789   static int Connect(ares_socket_t as, const struct sockaddr* target,
790                      ares_socklen_t target_len, void* user_data) {
791     WSAErrorContext wsa_error_ctx;
792     SockToPolledFdMap* map = static_cast<SockToPolledFdMap*>(user_data);
793     GrpcPolledFdWindows* polled_fd = map->LookupPolledFd(as);
794     return polled_fd->Connect(&wsa_error_ctx, target, target_len);
795   }
796
797   static ares_ssize_t SendV(ares_socket_t as, const struct iovec* iov,
798                             int iovec_count, void* user_data) {
799     WSAErrorContext wsa_error_ctx;
800     SockToPolledFdMap* map = static_cast<SockToPolledFdMap*>(user_data);
801     GrpcPolledFdWindows* polled_fd = map->LookupPolledFd(as);
802     return polled_fd->SendV(&wsa_error_ctx, iov, iovec_count);
803   }
804
805   static ares_ssize_t RecvFrom(ares_socket_t as, void* data, size_t data_len,
806                                int flags, struct sockaddr* from,
807                                ares_socklen_t* from_len, void* user_data) {
808     WSAErrorContext wsa_error_ctx;
809     SockToPolledFdMap* map = static_cast<SockToPolledFdMap*>(user_data);
810     GrpcPolledFdWindows* polled_fd = map->LookupPolledFd(as);
811     return polled_fd->RecvFrom(&wsa_error_ctx, data, data_len, flags, from,
812                                from_len);
813   }
814
815   static int CloseSocket(SOCKET s, void* user_data) {
816     SockToPolledFdMap* map = static_cast<SockToPolledFdMap*>(user_data);
817     GrpcPolledFdWindows* polled_fd = map->LookupPolledFd(s);
818     map->RemoveEntry(s);
819     // If a gRPC polled fd has not made it in to the driver's list yet, then
820     // the driver has not and will never see this socket.
821     if (!polled_fd->gotten_into_driver_list()) {
822       polled_fd->ShutdownLocked(GRPC_ERROR_CREATE_FROM_STATIC_STRING(
823           "Shut down c-ares fd before without it ever having made it into the "
824           "driver's list"));
825       return 0;
826     }
827     return 0;
828   }
829
830  private:
831   SockToPolledFdEntry* head_ = nullptr;
832   grpc_combiner* combiner_;
833 };
834
835 const struct ares_socket_functions custom_ares_sock_funcs = {
836     &SockToPolledFdMap::Socket /* socket */,
837     &SockToPolledFdMap::CloseSocket /* close */,
838     &SockToPolledFdMap::Connect /* connect */,
839     &SockToPolledFdMap::RecvFrom /* recvfrom */,
840     &SockToPolledFdMap::SendV /* sendv */,
841 };
842
843 class GrpcPolledFdFactoryWindows : public GrpcPolledFdFactory {
844  public:
845   GrpcPolledFdFactoryWindows(grpc_combiner* combiner)
846       : sock_to_polled_fd_map_(combiner) {}
847
848   GrpcPolledFd* NewGrpcPolledFdLocked(ares_socket_t as,
849                                       grpc_pollset_set* driver_pollset_set,
850                                       grpc_combiner* combiner) override {
851     GrpcPolledFdWindows* polled_fd = sock_to_polled_fd_map_.LookupPolledFd(as);
852     // Set a flag so that the virtual socket "close" method knows it
853     // doesn't need to call ShutdownLocked, since now the driver will.
854     polled_fd->set_gotten_into_driver_list();
855     return polled_fd;
856   }
857
858   void ConfigureAresChannelLocked(ares_channel channel) override {
859     ares_set_socket_functions(channel, &custom_ares_sock_funcs,
860                               &sock_to_polled_fd_map_);
861   }
862
863  private:
864   SockToPolledFdMap sock_to_polled_fd_map_;
865 };
866
867 UniquePtr<GrpcPolledFdFactory> NewGrpcPolledFdFactory(grpc_combiner* combiner) {
868   return UniquePtr<GrpcPolledFdFactory>(
869       New<GrpcPolledFdFactoryWindows>(combiner));
870 }
871
872 }  // namespace grpc_core
873
874 #endif /* GRPC_ARES == 1 && defined(GPR_WINDOWS) */