Built motion from commit 6a09e18b.|2.6.11
[motion2.git] / legacy-libs / grpc / deps / grpc / src / core / lib / security / transport / secure_endpoint.cc
1 /*
2  *
3  * Copyright 2015 gRPC authors.
4  *
5  * Licensed under the Apache License, Version 2.0 (the "License");
6  * you may not use this file except in compliance with the License.
7  * You may obtain a copy of the License at
8  *
9  *     http://www.apache.org/licenses/LICENSE-2.0
10  *
11  * Unless required by applicable law or agreed to in writing, software
12  * distributed under the License is distributed on an "AS IS" BASIS,
13  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14  * See the License for the specific language governing permissions and
15  * limitations under the License.
16  *
17  */
18
19 /* With the addition of a libuv endpoint, sockaddr.h now includes uv.h when
20    using that endpoint. Because of various transitive includes in uv.h,
21    including windows.h on Windows, uv.h must be included before other system
22    headers. Therefore, sockaddr.h must always be included first */
23 #include <grpc/support/port_platform.h>
24
25 #include <new>
26
27 #include "src/core/lib/iomgr/sockaddr.h"
28
29 #include <grpc/slice.h>
30 #include <grpc/slice_buffer.h>
31 #include <grpc/support/alloc.h>
32 #include <grpc/support/log.h>
33 #include <grpc/support/sync.h>
34 #include "src/core/lib/debug/trace.h"
35 #include "src/core/lib/gpr/string.h"
36 #include "src/core/lib/gprpp/memory.h"
37 #include "src/core/lib/profiling/timers.h"
38 #include "src/core/lib/security/transport/secure_endpoint.h"
39 #include "src/core/lib/security/transport/tsi_error.h"
40 #include "src/core/lib/slice/slice_internal.h"
41 #include "src/core/lib/slice/slice_string_helpers.h"
42 #include "src/core/tsi/transport_security_grpc.h"
43
44 #define STAGING_BUFFER_SIZE 8192
45
46 static void on_read(void* user_data, grpc_error* error);
47
48 namespace {
49 struct secure_endpoint {
50   secure_endpoint(const grpc_endpoint_vtable* vtable,
51                   tsi_frame_protector* protector,
52                   tsi_zero_copy_grpc_protector* zero_copy_protector,
53                   grpc_endpoint* transport, grpc_slice* leftover_slices,
54                   size_t leftover_nslices)
55       : wrapped_ep(transport),
56         protector(protector),
57         zero_copy_protector(zero_copy_protector) {
58     base.vtable = vtable;
59     gpr_mu_init(&protector_mu);
60     GRPC_CLOSURE_INIT(&on_read, ::on_read, this, grpc_schedule_on_exec_ctx);
61     grpc_slice_buffer_init(&source_buffer);
62     grpc_slice_buffer_init(&leftover_bytes);
63     for (size_t i = 0; i < leftover_nslices; i++) {
64       grpc_slice_buffer_add(&leftover_bytes,
65                             grpc_slice_ref_internal(leftover_slices[i]));
66     }
67     grpc_slice_buffer_init(&output_buffer);
68     gpr_ref_init(&ref, 1);
69   }
70
71   ~secure_endpoint() {
72     grpc_endpoint_destroy(wrapped_ep);
73     tsi_frame_protector_destroy(protector);
74     tsi_zero_copy_grpc_protector_destroy(zero_copy_protector);
75     grpc_slice_buffer_destroy_internal(&source_buffer);
76     grpc_slice_buffer_destroy_internal(&leftover_bytes);
77     grpc_slice_unref_internal(read_staging_buffer);
78     grpc_slice_unref_internal(write_staging_buffer);
79     grpc_slice_buffer_destroy_internal(&output_buffer);
80     gpr_mu_destroy(&protector_mu);
81   }
82
83   grpc_endpoint base;
84   grpc_endpoint* wrapped_ep;
85   struct tsi_frame_protector* protector;
86   struct tsi_zero_copy_grpc_protector* zero_copy_protector;
87   gpr_mu protector_mu;
88   /* saved upper level callbacks and user_data. */
89   grpc_closure* read_cb = nullptr;
90   grpc_closure* write_cb = nullptr;
91   grpc_closure on_read;
92   grpc_slice_buffer* read_buffer = nullptr;
93   grpc_slice_buffer source_buffer;
94   /* saved handshaker leftover data to unprotect. */
95   grpc_slice_buffer leftover_bytes;
96   /* buffers for read and write */
97   grpc_slice read_staging_buffer = GRPC_SLICE_MALLOC(STAGING_BUFFER_SIZE);
98   grpc_slice write_staging_buffer = GRPC_SLICE_MALLOC(STAGING_BUFFER_SIZE);
99   grpc_slice_buffer output_buffer;
100
101   gpr_refcount ref;
102 };
103 }  // namespace
104
105 grpc_core::TraceFlag grpc_trace_secure_endpoint(false, "secure_endpoint");
106
107 static void destroy(secure_endpoint* ep) { grpc_core::Delete(ep); }
108
109 #ifndef NDEBUG
110 #define SECURE_ENDPOINT_UNREF(ep, reason) \
111   secure_endpoint_unref((ep), (reason), __FILE__, __LINE__)
112 #define SECURE_ENDPOINT_REF(ep, reason) \
113   secure_endpoint_ref((ep), (reason), __FILE__, __LINE__)
114 static void secure_endpoint_unref(secure_endpoint* ep, const char* reason,
115                                   const char* file, int line) {
116   if (GRPC_TRACE_FLAG_ENABLED(grpc_trace_secure_endpoint)) {
117     gpr_atm val = gpr_atm_no_barrier_load(&ep->ref.count);
118     gpr_log(file, line, GPR_LOG_SEVERITY_DEBUG,
119             "SECENDP unref %p : %s %" PRIdPTR " -> %" PRIdPTR, ep, reason, val,
120             val - 1);
121   }
122   if (gpr_unref(&ep->ref)) {
123     destroy(ep);
124   }
125 }
126
127 static void secure_endpoint_ref(secure_endpoint* ep, const char* reason,
128                                 const char* file, int line) {
129   if (GRPC_TRACE_FLAG_ENABLED(grpc_trace_secure_endpoint)) {
130     gpr_atm val = gpr_atm_no_barrier_load(&ep->ref.count);
131     gpr_log(file, line, GPR_LOG_SEVERITY_DEBUG,
132             "SECENDP   ref %p : %s %" PRIdPTR " -> %" PRIdPTR, ep, reason, val,
133             val + 1);
134   }
135   gpr_ref(&ep->ref);
136 }
137 #else
138 #define SECURE_ENDPOINT_UNREF(ep, reason) secure_endpoint_unref((ep))
139 #define SECURE_ENDPOINT_REF(ep, reason) secure_endpoint_ref((ep))
140 static void secure_endpoint_unref(secure_endpoint* ep) {
141   if (gpr_unref(&ep->ref)) {
142     destroy(ep);
143   }
144 }
145
146 static void secure_endpoint_ref(secure_endpoint* ep) { gpr_ref(&ep->ref); }
147 #endif
148
149 static void flush_read_staging_buffer(secure_endpoint* ep, uint8_t** cur,
150                                       uint8_t** end) {
151   grpc_slice_buffer_add(ep->read_buffer, ep->read_staging_buffer);
152   ep->read_staging_buffer = GRPC_SLICE_MALLOC(STAGING_BUFFER_SIZE);
153   *cur = GRPC_SLICE_START_PTR(ep->read_staging_buffer);
154   *end = GRPC_SLICE_END_PTR(ep->read_staging_buffer);
155 }
156
157 static void call_read_cb(secure_endpoint* ep, grpc_error* error) {
158   if (GRPC_TRACE_FLAG_ENABLED(grpc_trace_secure_endpoint)) {
159     size_t i;
160     for (i = 0; i < ep->read_buffer->count; i++) {
161       char* data = grpc_dump_slice(ep->read_buffer->slices[i],
162                                    GPR_DUMP_HEX | GPR_DUMP_ASCII);
163       gpr_log(GPR_INFO, "READ %p: %s", ep, data);
164       gpr_free(data);
165     }
166   }
167   ep->read_buffer = nullptr;
168   GRPC_CLOSURE_SCHED(ep->read_cb, error);
169   SECURE_ENDPOINT_UNREF(ep, "read");
170 }
171
172 static void on_read(void* user_data, grpc_error* error) {
173   unsigned i;
174   uint8_t keep_looping = 0;
175   tsi_result result = TSI_OK;
176   secure_endpoint* ep = static_cast<secure_endpoint*>(user_data);
177   uint8_t* cur = GRPC_SLICE_START_PTR(ep->read_staging_buffer);
178   uint8_t* end = GRPC_SLICE_END_PTR(ep->read_staging_buffer);
179
180   if (error != GRPC_ERROR_NONE) {
181     grpc_slice_buffer_reset_and_unref_internal(ep->read_buffer);
182     call_read_cb(ep, GRPC_ERROR_CREATE_REFERENCING_FROM_STATIC_STRING(
183                          "Secure read failed", &error, 1));
184     return;
185   }
186
187   if (ep->zero_copy_protector != nullptr) {
188     // Use zero-copy grpc protector to unprotect.
189     result = tsi_zero_copy_grpc_protector_unprotect(
190         ep->zero_copy_protector, &ep->source_buffer, ep->read_buffer);
191   } else {
192     // Use frame protector to unprotect.
193     /* TODO(yangg) check error, maybe bail out early */
194     for (i = 0; i < ep->source_buffer.count; i++) {
195       grpc_slice encrypted = ep->source_buffer.slices[i];
196       uint8_t* message_bytes = GRPC_SLICE_START_PTR(encrypted);
197       size_t message_size = GRPC_SLICE_LENGTH(encrypted);
198
199       while (message_size > 0 || keep_looping) {
200         size_t unprotected_buffer_size_written = static_cast<size_t>(end - cur);
201         size_t processed_message_size = message_size;
202         gpr_mu_lock(&ep->protector_mu);
203         result = tsi_frame_protector_unprotect(
204             ep->protector, message_bytes, &processed_message_size, cur,
205             &unprotected_buffer_size_written);
206         gpr_mu_unlock(&ep->protector_mu);
207         if (result != TSI_OK) {
208           gpr_log(GPR_ERROR, "Decryption error: %s",
209                   tsi_result_to_string(result));
210           break;
211         }
212         message_bytes += processed_message_size;
213         message_size -= processed_message_size;
214         cur += unprotected_buffer_size_written;
215
216         if (cur == end) {
217           flush_read_staging_buffer(ep, &cur, &end);
218           /* Force to enter the loop again to extract buffered bytes in
219              protector. The bytes could be buffered because of running out of
220              staging_buffer. If this happens at the end of all slices, doing
221              another unprotect avoids leaving data in the protector. */
222           keep_looping = 1;
223         } else if (unprotected_buffer_size_written > 0) {
224           keep_looping = 1;
225         } else {
226           keep_looping = 0;
227         }
228       }
229       if (result != TSI_OK) break;
230     }
231
232     if (cur != GRPC_SLICE_START_PTR(ep->read_staging_buffer)) {
233       grpc_slice_buffer_add(
234           ep->read_buffer,
235           grpc_slice_split_head(
236               &ep->read_staging_buffer,
237               static_cast<size_t>(
238                   cur - GRPC_SLICE_START_PTR(ep->read_staging_buffer))));
239     }
240   }
241
242   /* TODO(yangg) experiment with moving this block after read_cb to see if it
243      helps latency */
244   grpc_slice_buffer_reset_and_unref_internal(&ep->source_buffer);
245
246   if (result != TSI_OK) {
247     grpc_slice_buffer_reset_and_unref_internal(ep->read_buffer);
248     call_read_cb(
249         ep, grpc_set_tsi_error_result(
250                 GRPC_ERROR_CREATE_FROM_STATIC_STRING("Unwrap failed"), result));
251     return;
252   }
253
254   call_read_cb(ep, GRPC_ERROR_NONE);
255 }
256
257 static void endpoint_read(grpc_endpoint* secure_ep, grpc_slice_buffer* slices,
258                           grpc_closure* cb, bool urgent) {
259   secure_endpoint* ep = reinterpret_cast<secure_endpoint*>(secure_ep);
260   ep->read_cb = cb;
261   ep->read_buffer = slices;
262   grpc_slice_buffer_reset_and_unref_internal(ep->read_buffer);
263
264   SECURE_ENDPOINT_REF(ep, "read");
265   if (ep->leftover_bytes.count) {
266     grpc_slice_buffer_swap(&ep->leftover_bytes, &ep->source_buffer);
267     GPR_ASSERT(ep->leftover_bytes.count == 0);
268     on_read(ep, GRPC_ERROR_NONE);
269     return;
270   }
271
272   grpc_endpoint_read(ep->wrapped_ep, &ep->source_buffer, &ep->on_read, urgent);
273 }
274
275 static void flush_write_staging_buffer(secure_endpoint* ep, uint8_t** cur,
276                                        uint8_t** end) {
277   grpc_slice_buffer_add(&ep->output_buffer, ep->write_staging_buffer);
278   ep->write_staging_buffer = GRPC_SLICE_MALLOC(STAGING_BUFFER_SIZE);
279   *cur = GRPC_SLICE_START_PTR(ep->write_staging_buffer);
280   *end = GRPC_SLICE_END_PTR(ep->write_staging_buffer);
281 }
282
283 static void endpoint_write(grpc_endpoint* secure_ep, grpc_slice_buffer* slices,
284                            grpc_closure* cb, void* arg) {
285   GPR_TIMER_SCOPE("secure_endpoint.endpoint_write", 0);
286
287   unsigned i;
288   tsi_result result = TSI_OK;
289   secure_endpoint* ep = reinterpret_cast<secure_endpoint*>(secure_ep);
290   uint8_t* cur = GRPC_SLICE_START_PTR(ep->write_staging_buffer);
291   uint8_t* end = GRPC_SLICE_END_PTR(ep->write_staging_buffer);
292
293   grpc_slice_buffer_reset_and_unref_internal(&ep->output_buffer);
294
295   if (GRPC_TRACE_FLAG_ENABLED(grpc_trace_secure_endpoint)) {
296     for (i = 0; i < slices->count; i++) {
297       char* data =
298           grpc_dump_slice(slices->slices[i], GPR_DUMP_HEX | GPR_DUMP_ASCII);
299       gpr_log(GPR_INFO, "WRITE %p: %s", ep, data);
300       gpr_free(data);
301     }
302   }
303
304   if (ep->zero_copy_protector != nullptr) {
305     // Use zero-copy grpc protector to protect.
306     result = tsi_zero_copy_grpc_protector_protect(ep->zero_copy_protector,
307                                                   slices, &ep->output_buffer);
308   } else {
309     // Use frame protector to protect.
310     for (i = 0; i < slices->count; i++) {
311       grpc_slice plain = slices->slices[i];
312       uint8_t* message_bytes = GRPC_SLICE_START_PTR(plain);
313       size_t message_size = GRPC_SLICE_LENGTH(plain);
314       while (message_size > 0) {
315         size_t protected_buffer_size_to_send = static_cast<size_t>(end - cur);
316         size_t processed_message_size = message_size;
317         gpr_mu_lock(&ep->protector_mu);
318         result = tsi_frame_protector_protect(ep->protector, message_bytes,
319                                              &processed_message_size, cur,
320                                              &protected_buffer_size_to_send);
321         gpr_mu_unlock(&ep->protector_mu);
322         if (result != TSI_OK) {
323           gpr_log(GPR_ERROR, "Encryption error: %s",
324                   tsi_result_to_string(result));
325           break;
326         }
327         message_bytes += processed_message_size;
328         message_size -= processed_message_size;
329         cur += protected_buffer_size_to_send;
330
331         if (cur == end) {
332           flush_write_staging_buffer(ep, &cur, &end);
333         }
334       }
335       if (result != TSI_OK) break;
336     }
337     if (result == TSI_OK) {
338       size_t still_pending_size;
339       do {
340         size_t protected_buffer_size_to_send = static_cast<size_t>(end - cur);
341         gpr_mu_lock(&ep->protector_mu);
342         result = tsi_frame_protector_protect_flush(
343             ep->protector, cur, &protected_buffer_size_to_send,
344             &still_pending_size);
345         gpr_mu_unlock(&ep->protector_mu);
346         if (result != TSI_OK) break;
347         cur += protected_buffer_size_to_send;
348         if (cur == end) {
349           flush_write_staging_buffer(ep, &cur, &end);
350         }
351       } while (still_pending_size > 0);
352       if (cur != GRPC_SLICE_START_PTR(ep->write_staging_buffer)) {
353         grpc_slice_buffer_add(
354             &ep->output_buffer,
355             grpc_slice_split_head(
356                 &ep->write_staging_buffer,
357                 static_cast<size_t>(
358                     cur - GRPC_SLICE_START_PTR(ep->write_staging_buffer))));
359       }
360     }
361   }
362
363   if (result != TSI_OK) {
364     /* TODO(yangg) do different things according to the error type? */
365     grpc_slice_buffer_reset_and_unref_internal(&ep->output_buffer);
366     GRPC_CLOSURE_SCHED(
367         cb, grpc_set_tsi_error_result(
368                 GRPC_ERROR_CREATE_FROM_STATIC_STRING("Wrap failed"), result));
369     return;
370   }
371
372   grpc_endpoint_write(ep->wrapped_ep, &ep->output_buffer, cb, arg);
373 }
374
375 static void endpoint_shutdown(grpc_endpoint* secure_ep, grpc_error* why) {
376   secure_endpoint* ep = reinterpret_cast<secure_endpoint*>(secure_ep);
377   grpc_endpoint_shutdown(ep->wrapped_ep, why);
378 }
379
380 static void endpoint_destroy(grpc_endpoint* secure_ep) {
381   secure_endpoint* ep = reinterpret_cast<secure_endpoint*>(secure_ep);
382   SECURE_ENDPOINT_UNREF(ep, "destroy");
383 }
384
385 static void endpoint_add_to_pollset(grpc_endpoint* secure_ep,
386                                     grpc_pollset* pollset) {
387   secure_endpoint* ep = reinterpret_cast<secure_endpoint*>(secure_ep);
388   grpc_endpoint_add_to_pollset(ep->wrapped_ep, pollset);
389 }
390
391 static void endpoint_add_to_pollset_set(grpc_endpoint* secure_ep,
392                                         grpc_pollset_set* pollset_set) {
393   secure_endpoint* ep = reinterpret_cast<secure_endpoint*>(secure_ep);
394   grpc_endpoint_add_to_pollset_set(ep->wrapped_ep, pollset_set);
395 }
396
397 static void endpoint_delete_from_pollset_set(grpc_endpoint* secure_ep,
398                                              grpc_pollset_set* pollset_set) {
399   secure_endpoint* ep = reinterpret_cast<secure_endpoint*>(secure_ep);
400   grpc_endpoint_delete_from_pollset_set(ep->wrapped_ep, pollset_set);
401 }
402
403 static char* endpoint_get_peer(grpc_endpoint* secure_ep) {
404   secure_endpoint* ep = reinterpret_cast<secure_endpoint*>(secure_ep);
405   return grpc_endpoint_get_peer(ep->wrapped_ep);
406 }
407
408 static int endpoint_get_fd(grpc_endpoint* secure_ep) {
409   secure_endpoint* ep = reinterpret_cast<secure_endpoint*>(secure_ep);
410   return grpc_endpoint_get_fd(ep->wrapped_ep);
411 }
412
413 static grpc_resource_user* endpoint_get_resource_user(
414     grpc_endpoint* secure_ep) {
415   secure_endpoint* ep = reinterpret_cast<secure_endpoint*>(secure_ep);
416   return grpc_endpoint_get_resource_user(ep->wrapped_ep);
417 }
418
419 static bool endpoint_can_track_err(grpc_endpoint* secure_ep) {
420   secure_endpoint* ep = reinterpret_cast<secure_endpoint*>(secure_ep);
421   return grpc_endpoint_can_track_err(ep->wrapped_ep);
422 }
423
424 static const grpc_endpoint_vtable vtable = {endpoint_read,
425                                             endpoint_write,
426                                             endpoint_add_to_pollset,
427                                             endpoint_add_to_pollset_set,
428                                             endpoint_delete_from_pollset_set,
429                                             endpoint_shutdown,
430                                             endpoint_destroy,
431                                             endpoint_get_resource_user,
432                                             endpoint_get_peer,
433                                             endpoint_get_fd,
434                                             endpoint_can_track_err};
435
436 grpc_endpoint* grpc_secure_endpoint_create(
437     struct tsi_frame_protector* protector,
438     struct tsi_zero_copy_grpc_protector* zero_copy_protector,
439     grpc_endpoint* transport, grpc_slice* leftover_slices,
440     size_t leftover_nslices) {
441   secure_endpoint* ep = grpc_core::New<secure_endpoint>(
442       &vtable, protector, zero_copy_protector, transport, leftover_slices,
443       leftover_nslices);
444   return &ep->base;
445 }