3 * Copyright 2015 gRPC authors.
5 * Licensed under the Apache License, Version 2.0 (the "License");
6 * you may not use this file except in compliance with the License.
7 * You may obtain a copy of the License at
9 * http://www.apache.org/licenses/LICENSE-2.0
11 * Unless required by applicable law or agreed to in writing, software
12 * distributed under the License is distributed on an "AS IS" BASIS,
13 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14 * See the License for the specific language governing permissions and
15 * limitations under the License.
19 /* With the addition of a libuv endpoint, sockaddr.h now includes uv.h when
20 using that endpoint. Because of various transitive includes in uv.h,
21 including windows.h on Windows, uv.h must be included before other system
22 headers. Therefore, sockaddr.h must always be included first */
23 #include <grpc/support/port_platform.h>
27 #include "src/core/lib/iomgr/sockaddr.h"
29 #include <grpc/slice.h>
30 #include <grpc/slice_buffer.h>
31 #include <grpc/support/alloc.h>
32 #include <grpc/support/log.h>
33 #include <grpc/support/sync.h>
34 #include "src/core/lib/debug/trace.h"
35 #include "src/core/lib/gpr/string.h"
36 #include "src/core/lib/gprpp/memory.h"
37 #include "src/core/lib/profiling/timers.h"
38 #include "src/core/lib/security/transport/secure_endpoint.h"
39 #include "src/core/lib/security/transport/tsi_error.h"
40 #include "src/core/lib/slice/slice_internal.h"
41 #include "src/core/lib/slice/slice_string_helpers.h"
42 #include "src/core/tsi/transport_security_grpc.h"
44 #define STAGING_BUFFER_SIZE 8192
46 static void on_read(void* user_data, grpc_error* error);
49 struct secure_endpoint {
50 secure_endpoint(const grpc_endpoint_vtable* vtable,
51 tsi_frame_protector* protector,
52 tsi_zero_copy_grpc_protector* zero_copy_protector,
53 grpc_endpoint* transport, grpc_slice* leftover_slices,
54 size_t leftover_nslices)
55 : wrapped_ep(transport),
57 zero_copy_protector(zero_copy_protector) {
59 gpr_mu_init(&protector_mu);
60 GRPC_CLOSURE_INIT(&on_read, ::on_read, this, grpc_schedule_on_exec_ctx);
61 grpc_slice_buffer_init(&source_buffer);
62 grpc_slice_buffer_init(&leftover_bytes);
63 for (size_t i = 0; i < leftover_nslices; i++) {
64 grpc_slice_buffer_add(&leftover_bytes,
65 grpc_slice_ref_internal(leftover_slices[i]));
67 grpc_slice_buffer_init(&output_buffer);
68 gpr_ref_init(&ref, 1);
72 grpc_endpoint_destroy(wrapped_ep);
73 tsi_frame_protector_destroy(protector);
74 tsi_zero_copy_grpc_protector_destroy(zero_copy_protector);
75 grpc_slice_buffer_destroy_internal(&source_buffer);
76 grpc_slice_buffer_destroy_internal(&leftover_bytes);
77 grpc_slice_unref_internal(read_staging_buffer);
78 grpc_slice_unref_internal(write_staging_buffer);
79 grpc_slice_buffer_destroy_internal(&output_buffer);
80 gpr_mu_destroy(&protector_mu);
84 grpc_endpoint* wrapped_ep;
85 struct tsi_frame_protector* protector;
86 struct tsi_zero_copy_grpc_protector* zero_copy_protector;
88 /* saved upper level callbacks and user_data. */
89 grpc_closure* read_cb = nullptr;
90 grpc_closure* write_cb = nullptr;
92 grpc_slice_buffer* read_buffer = nullptr;
93 grpc_slice_buffer source_buffer;
94 /* saved handshaker leftover data to unprotect. */
95 grpc_slice_buffer leftover_bytes;
96 /* buffers for read and write */
97 grpc_slice read_staging_buffer = GRPC_SLICE_MALLOC(STAGING_BUFFER_SIZE);
98 grpc_slice write_staging_buffer = GRPC_SLICE_MALLOC(STAGING_BUFFER_SIZE);
99 grpc_slice_buffer output_buffer;
105 grpc_core::TraceFlag grpc_trace_secure_endpoint(false, "secure_endpoint");
107 static void destroy(secure_endpoint* ep) { grpc_core::Delete(ep); }
110 #define SECURE_ENDPOINT_UNREF(ep, reason) \
111 secure_endpoint_unref((ep), (reason), __FILE__, __LINE__)
112 #define SECURE_ENDPOINT_REF(ep, reason) \
113 secure_endpoint_ref((ep), (reason), __FILE__, __LINE__)
114 static void secure_endpoint_unref(secure_endpoint* ep, const char* reason,
115 const char* file, int line) {
116 if (GRPC_TRACE_FLAG_ENABLED(grpc_trace_secure_endpoint)) {
117 gpr_atm val = gpr_atm_no_barrier_load(&ep->ref.count);
118 gpr_log(file, line, GPR_LOG_SEVERITY_DEBUG,
119 "SECENDP unref %p : %s %" PRIdPTR " -> %" PRIdPTR, ep, reason, val,
122 if (gpr_unref(&ep->ref)) {
127 static void secure_endpoint_ref(secure_endpoint* ep, const char* reason,
128 const char* file, int line) {
129 if (GRPC_TRACE_FLAG_ENABLED(grpc_trace_secure_endpoint)) {
130 gpr_atm val = gpr_atm_no_barrier_load(&ep->ref.count);
131 gpr_log(file, line, GPR_LOG_SEVERITY_DEBUG,
132 "SECENDP ref %p : %s %" PRIdPTR " -> %" PRIdPTR, ep, reason, val,
138 #define SECURE_ENDPOINT_UNREF(ep, reason) secure_endpoint_unref((ep))
139 #define SECURE_ENDPOINT_REF(ep, reason) secure_endpoint_ref((ep))
140 static void secure_endpoint_unref(secure_endpoint* ep) {
141 if (gpr_unref(&ep->ref)) {
146 static void secure_endpoint_ref(secure_endpoint* ep) { gpr_ref(&ep->ref); }
149 static void flush_read_staging_buffer(secure_endpoint* ep, uint8_t** cur,
151 grpc_slice_buffer_add(ep->read_buffer, ep->read_staging_buffer);
152 ep->read_staging_buffer = GRPC_SLICE_MALLOC(STAGING_BUFFER_SIZE);
153 *cur = GRPC_SLICE_START_PTR(ep->read_staging_buffer);
154 *end = GRPC_SLICE_END_PTR(ep->read_staging_buffer);
157 static void call_read_cb(secure_endpoint* ep, grpc_error* error) {
158 if (GRPC_TRACE_FLAG_ENABLED(grpc_trace_secure_endpoint)) {
160 for (i = 0; i < ep->read_buffer->count; i++) {
161 char* data = grpc_dump_slice(ep->read_buffer->slices[i],
162 GPR_DUMP_HEX | GPR_DUMP_ASCII);
163 gpr_log(GPR_INFO, "READ %p: %s", ep, data);
167 ep->read_buffer = nullptr;
168 GRPC_CLOSURE_SCHED(ep->read_cb, error);
169 SECURE_ENDPOINT_UNREF(ep, "read");
172 static void on_read(void* user_data, grpc_error* error) {
174 uint8_t keep_looping = 0;
175 tsi_result result = TSI_OK;
176 secure_endpoint* ep = static_cast<secure_endpoint*>(user_data);
177 uint8_t* cur = GRPC_SLICE_START_PTR(ep->read_staging_buffer);
178 uint8_t* end = GRPC_SLICE_END_PTR(ep->read_staging_buffer);
180 if (error != GRPC_ERROR_NONE) {
181 grpc_slice_buffer_reset_and_unref_internal(ep->read_buffer);
182 call_read_cb(ep, GRPC_ERROR_CREATE_REFERENCING_FROM_STATIC_STRING(
183 "Secure read failed", &error, 1));
187 if (ep->zero_copy_protector != nullptr) {
188 // Use zero-copy grpc protector to unprotect.
189 result = tsi_zero_copy_grpc_protector_unprotect(
190 ep->zero_copy_protector, &ep->source_buffer, ep->read_buffer);
192 // Use frame protector to unprotect.
193 /* TODO(yangg) check error, maybe bail out early */
194 for (i = 0; i < ep->source_buffer.count; i++) {
195 grpc_slice encrypted = ep->source_buffer.slices[i];
196 uint8_t* message_bytes = GRPC_SLICE_START_PTR(encrypted);
197 size_t message_size = GRPC_SLICE_LENGTH(encrypted);
199 while (message_size > 0 || keep_looping) {
200 size_t unprotected_buffer_size_written = static_cast<size_t>(end - cur);
201 size_t processed_message_size = message_size;
202 gpr_mu_lock(&ep->protector_mu);
203 result = tsi_frame_protector_unprotect(
204 ep->protector, message_bytes, &processed_message_size, cur,
205 &unprotected_buffer_size_written);
206 gpr_mu_unlock(&ep->protector_mu);
207 if (result != TSI_OK) {
208 gpr_log(GPR_ERROR, "Decryption error: %s",
209 tsi_result_to_string(result));
212 message_bytes += processed_message_size;
213 message_size -= processed_message_size;
214 cur += unprotected_buffer_size_written;
217 flush_read_staging_buffer(ep, &cur, &end);
218 /* Force to enter the loop again to extract buffered bytes in
219 protector. The bytes could be buffered because of running out of
220 staging_buffer. If this happens at the end of all slices, doing
221 another unprotect avoids leaving data in the protector. */
223 } else if (unprotected_buffer_size_written > 0) {
229 if (result != TSI_OK) break;
232 if (cur != GRPC_SLICE_START_PTR(ep->read_staging_buffer)) {
233 grpc_slice_buffer_add(
235 grpc_slice_split_head(
236 &ep->read_staging_buffer,
238 cur - GRPC_SLICE_START_PTR(ep->read_staging_buffer))));
242 /* TODO(yangg) experiment with moving this block after read_cb to see if it
244 grpc_slice_buffer_reset_and_unref_internal(&ep->source_buffer);
246 if (result != TSI_OK) {
247 grpc_slice_buffer_reset_and_unref_internal(ep->read_buffer);
249 ep, grpc_set_tsi_error_result(
250 GRPC_ERROR_CREATE_FROM_STATIC_STRING("Unwrap failed"), result));
254 call_read_cb(ep, GRPC_ERROR_NONE);
257 static void endpoint_read(grpc_endpoint* secure_ep, grpc_slice_buffer* slices,
258 grpc_closure* cb, bool urgent) {
259 secure_endpoint* ep = reinterpret_cast<secure_endpoint*>(secure_ep);
261 ep->read_buffer = slices;
262 grpc_slice_buffer_reset_and_unref_internal(ep->read_buffer);
264 SECURE_ENDPOINT_REF(ep, "read");
265 if (ep->leftover_bytes.count) {
266 grpc_slice_buffer_swap(&ep->leftover_bytes, &ep->source_buffer);
267 GPR_ASSERT(ep->leftover_bytes.count == 0);
268 on_read(ep, GRPC_ERROR_NONE);
272 grpc_endpoint_read(ep->wrapped_ep, &ep->source_buffer, &ep->on_read, urgent);
275 static void flush_write_staging_buffer(secure_endpoint* ep, uint8_t** cur,
277 grpc_slice_buffer_add(&ep->output_buffer, ep->write_staging_buffer);
278 ep->write_staging_buffer = GRPC_SLICE_MALLOC(STAGING_BUFFER_SIZE);
279 *cur = GRPC_SLICE_START_PTR(ep->write_staging_buffer);
280 *end = GRPC_SLICE_END_PTR(ep->write_staging_buffer);
283 static void endpoint_write(grpc_endpoint* secure_ep, grpc_slice_buffer* slices,
284 grpc_closure* cb, void* arg) {
285 GPR_TIMER_SCOPE("secure_endpoint.endpoint_write", 0);
288 tsi_result result = TSI_OK;
289 secure_endpoint* ep = reinterpret_cast<secure_endpoint*>(secure_ep);
290 uint8_t* cur = GRPC_SLICE_START_PTR(ep->write_staging_buffer);
291 uint8_t* end = GRPC_SLICE_END_PTR(ep->write_staging_buffer);
293 grpc_slice_buffer_reset_and_unref_internal(&ep->output_buffer);
295 if (GRPC_TRACE_FLAG_ENABLED(grpc_trace_secure_endpoint)) {
296 for (i = 0; i < slices->count; i++) {
298 grpc_dump_slice(slices->slices[i], GPR_DUMP_HEX | GPR_DUMP_ASCII);
299 gpr_log(GPR_INFO, "WRITE %p: %s", ep, data);
304 if (ep->zero_copy_protector != nullptr) {
305 // Use zero-copy grpc protector to protect.
306 result = tsi_zero_copy_grpc_protector_protect(ep->zero_copy_protector,
307 slices, &ep->output_buffer);
309 // Use frame protector to protect.
310 for (i = 0; i < slices->count; i++) {
311 grpc_slice plain = slices->slices[i];
312 uint8_t* message_bytes = GRPC_SLICE_START_PTR(plain);
313 size_t message_size = GRPC_SLICE_LENGTH(plain);
314 while (message_size > 0) {
315 size_t protected_buffer_size_to_send = static_cast<size_t>(end - cur);
316 size_t processed_message_size = message_size;
317 gpr_mu_lock(&ep->protector_mu);
318 result = tsi_frame_protector_protect(ep->protector, message_bytes,
319 &processed_message_size, cur,
320 &protected_buffer_size_to_send);
321 gpr_mu_unlock(&ep->protector_mu);
322 if (result != TSI_OK) {
323 gpr_log(GPR_ERROR, "Encryption error: %s",
324 tsi_result_to_string(result));
327 message_bytes += processed_message_size;
328 message_size -= processed_message_size;
329 cur += protected_buffer_size_to_send;
332 flush_write_staging_buffer(ep, &cur, &end);
335 if (result != TSI_OK) break;
337 if (result == TSI_OK) {
338 size_t still_pending_size;
340 size_t protected_buffer_size_to_send = static_cast<size_t>(end - cur);
341 gpr_mu_lock(&ep->protector_mu);
342 result = tsi_frame_protector_protect_flush(
343 ep->protector, cur, &protected_buffer_size_to_send,
344 &still_pending_size);
345 gpr_mu_unlock(&ep->protector_mu);
346 if (result != TSI_OK) break;
347 cur += protected_buffer_size_to_send;
349 flush_write_staging_buffer(ep, &cur, &end);
351 } while (still_pending_size > 0);
352 if (cur != GRPC_SLICE_START_PTR(ep->write_staging_buffer)) {
353 grpc_slice_buffer_add(
355 grpc_slice_split_head(
356 &ep->write_staging_buffer,
358 cur - GRPC_SLICE_START_PTR(ep->write_staging_buffer))));
363 if (result != TSI_OK) {
364 /* TODO(yangg) do different things according to the error type? */
365 grpc_slice_buffer_reset_and_unref_internal(&ep->output_buffer);
367 cb, grpc_set_tsi_error_result(
368 GRPC_ERROR_CREATE_FROM_STATIC_STRING("Wrap failed"), result));
372 grpc_endpoint_write(ep->wrapped_ep, &ep->output_buffer, cb, arg);
375 static void endpoint_shutdown(grpc_endpoint* secure_ep, grpc_error* why) {
376 secure_endpoint* ep = reinterpret_cast<secure_endpoint*>(secure_ep);
377 grpc_endpoint_shutdown(ep->wrapped_ep, why);
380 static void endpoint_destroy(grpc_endpoint* secure_ep) {
381 secure_endpoint* ep = reinterpret_cast<secure_endpoint*>(secure_ep);
382 SECURE_ENDPOINT_UNREF(ep, "destroy");
385 static void endpoint_add_to_pollset(grpc_endpoint* secure_ep,
386 grpc_pollset* pollset) {
387 secure_endpoint* ep = reinterpret_cast<secure_endpoint*>(secure_ep);
388 grpc_endpoint_add_to_pollset(ep->wrapped_ep, pollset);
391 static void endpoint_add_to_pollset_set(grpc_endpoint* secure_ep,
392 grpc_pollset_set* pollset_set) {
393 secure_endpoint* ep = reinterpret_cast<secure_endpoint*>(secure_ep);
394 grpc_endpoint_add_to_pollset_set(ep->wrapped_ep, pollset_set);
397 static void endpoint_delete_from_pollset_set(grpc_endpoint* secure_ep,
398 grpc_pollset_set* pollset_set) {
399 secure_endpoint* ep = reinterpret_cast<secure_endpoint*>(secure_ep);
400 grpc_endpoint_delete_from_pollset_set(ep->wrapped_ep, pollset_set);
403 static char* endpoint_get_peer(grpc_endpoint* secure_ep) {
404 secure_endpoint* ep = reinterpret_cast<secure_endpoint*>(secure_ep);
405 return grpc_endpoint_get_peer(ep->wrapped_ep);
408 static int endpoint_get_fd(grpc_endpoint* secure_ep) {
409 secure_endpoint* ep = reinterpret_cast<secure_endpoint*>(secure_ep);
410 return grpc_endpoint_get_fd(ep->wrapped_ep);
413 static grpc_resource_user* endpoint_get_resource_user(
414 grpc_endpoint* secure_ep) {
415 secure_endpoint* ep = reinterpret_cast<secure_endpoint*>(secure_ep);
416 return grpc_endpoint_get_resource_user(ep->wrapped_ep);
419 static bool endpoint_can_track_err(grpc_endpoint* secure_ep) {
420 secure_endpoint* ep = reinterpret_cast<secure_endpoint*>(secure_ep);
421 return grpc_endpoint_can_track_err(ep->wrapped_ep);
424 static const grpc_endpoint_vtable vtable = {endpoint_read,
426 endpoint_add_to_pollset,
427 endpoint_add_to_pollset_set,
428 endpoint_delete_from_pollset_set,
431 endpoint_get_resource_user,
434 endpoint_can_track_err};
436 grpc_endpoint* grpc_secure_endpoint_create(
437 struct tsi_frame_protector* protector,
438 struct tsi_zero_copy_grpc_protector* zero_copy_protector,
439 grpc_endpoint* transport, grpc_slice* leftover_slices,
440 size_t leftover_nslices) {
441 secure_endpoint* ep = grpc_core::New<secure_endpoint>(
442 &vtable, protector, zero_copy_protector, transport, leftover_slices,