3 * Copyright 2018 gRPC authors.
5 * Licensed under the Apache License, Version 2.0 (the "License");
6 * you may not use this file except in compliance with the License.
7 * You may obtain a copy of the License at
9 * http://www.apache.org/licenses/LICENSE-2.0
11 * Unless required by applicable law or agreed to in writing, software
12 * distributed under the License is distributed on an "AS IS" BASIS,
13 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14 * See the License for the specific language governing permissions and
15 * limitations under the License.
19 #include <grpc/support/port_platform.h>
21 #include "src/core/ext/transport/chttp2/transport/chttp2_transport.h"
29 #include <grpc/slice_buffer.h>
30 #include <grpc/support/alloc.h>
31 #include <grpc/support/log.h>
32 #include <grpc/support/string_util.h>
34 #include "src/core/ext/transport/chttp2/transport/context_list.h"
35 #include "src/core/ext/transport/chttp2/transport/frame_data.h"
36 #include "src/core/ext/transport/chttp2/transport/internal.h"
37 #include "src/core/ext/transport/chttp2/transport/varint.h"
38 #include "src/core/lib/channel/channel_args.h"
39 #include "src/core/lib/compression/stream_compression.h"
40 #include "src/core/lib/debug/stats.h"
41 #include "src/core/lib/gpr/env.h"
42 #include "src/core/lib/gpr/string.h"
43 #include "src/core/lib/gprpp/memory.h"
44 #include "src/core/lib/http/parser.h"
45 #include "src/core/lib/iomgr/executor.h"
46 #include "src/core/lib/iomgr/iomgr.h"
47 #include "src/core/lib/iomgr/timer.h"
48 #include "src/core/lib/profiling/timers.h"
49 #include "src/core/lib/slice/slice_internal.h"
50 #include "src/core/lib/slice/slice_string_helpers.h"
51 #include "src/core/lib/transport/error_utils.h"
52 #include "src/core/lib/transport/http2_errors.h"
53 #include "src/core/lib/transport/static_metadata.h"
54 #include "src/core/lib/transport/status_conversion.h"
55 #include "src/core/lib/transport/timeout_encoding.h"
56 #include "src/core/lib/transport/transport.h"
57 #include "src/core/lib/transport/transport_impl.h"
58 #include "src/core/lib/uri/uri_parser.h"
60 #define DEFAULT_CONNECTION_WINDOW_TARGET (1024 * 1024)
61 #define MAX_WINDOW 0x7fffffffu
62 #define MAX_WRITE_BUFFER_SIZE (64 * 1024 * 1024)
63 #define DEFAULT_MAX_HEADER_LIST_SIZE (8 * 1024)
65 #define DEFAULT_CLIENT_KEEPALIVE_TIME_MS INT_MAX
66 #define DEFAULT_CLIENT_KEEPALIVE_TIMEOUT_MS 20000 /* 20 seconds */
67 #define DEFAULT_SERVER_KEEPALIVE_TIME_MS 7200000 /* 2 hours */
68 #define DEFAULT_SERVER_KEEPALIVE_TIMEOUT_MS 20000 /* 20 seconds */
69 #define DEFAULT_KEEPALIVE_PERMIT_WITHOUT_CALLS false
70 #define KEEPALIVE_TIME_BACKOFF_MULTIPLIER 2
72 #define DEFAULT_MIN_SENT_PING_INTERVAL_WITHOUT_DATA_MS 300000 /* 5 minutes */
73 #define DEFAULT_MIN_RECV_PING_INTERVAL_WITHOUT_DATA_MS 300000 /* 5 minutes */
74 #define DEFAULT_MAX_PINGS_BETWEEN_DATA 2
75 #define DEFAULT_MAX_PING_STRIKES 2
77 #define DEFAULT_MAX_PENDING_INDUCED_FRAMES 10000
79 static int g_default_client_keepalive_time_ms =
80 DEFAULT_CLIENT_KEEPALIVE_TIME_MS;
81 static int g_default_client_keepalive_timeout_ms =
82 DEFAULT_CLIENT_KEEPALIVE_TIMEOUT_MS;
83 static int g_default_server_keepalive_time_ms =
84 DEFAULT_SERVER_KEEPALIVE_TIME_MS;
85 static int g_default_server_keepalive_timeout_ms =
86 DEFAULT_SERVER_KEEPALIVE_TIMEOUT_MS;
87 static bool g_default_client_keepalive_permit_without_calls =
88 DEFAULT_KEEPALIVE_PERMIT_WITHOUT_CALLS;
89 static bool g_default_server_keepalive_permit_without_calls =
90 DEFAULT_KEEPALIVE_PERMIT_WITHOUT_CALLS;
92 static int g_default_min_sent_ping_interval_without_data_ms =
93 DEFAULT_MIN_SENT_PING_INTERVAL_WITHOUT_DATA_MS;
94 static int g_default_min_recv_ping_interval_without_data_ms =
95 DEFAULT_MIN_RECV_PING_INTERVAL_WITHOUT_DATA_MS;
96 static int g_default_max_pings_without_data = DEFAULT_MAX_PINGS_BETWEEN_DATA;
97 static int g_default_max_ping_strikes = DEFAULT_MAX_PING_STRIKES;
99 #define MAX_CLIENT_STREAM_ID 0x7fffffffu
100 grpc_core::TraceFlag grpc_http_trace(false, "http");
101 grpc_core::DebugOnlyTraceFlag grpc_trace_chttp2_refcount(false,
104 /* forward declarations of various callbacks that we'll build closures around */
105 static void write_action_begin_locked(void* t, grpc_error* error);
106 static void write_action(void* t, grpc_error* error);
107 static void write_action_end_locked(void* t, grpc_error* error);
109 static void read_action_locked(void* t, grpc_error* error);
110 static void continue_read_action_locked(grpc_chttp2_transport* t);
112 static void complete_fetch_locked(void* gs, grpc_error* error);
113 /** Set a transport level setting, and push it to our peer */
114 static void queue_setting_update(grpc_chttp2_transport* t,
115 grpc_chttp2_setting_id id, uint32_t value);
117 static void close_from_api(grpc_chttp2_transport* t, grpc_chttp2_stream* s,
120 /** Start new streams that have been created if we can */
121 static void maybe_start_some_streams(grpc_chttp2_transport* t);
123 static void connectivity_state_set(grpc_chttp2_transport* t,
124 grpc_connectivity_state state,
127 static void benign_reclaimer_locked(void* t, grpc_error* error);
128 static void destructive_reclaimer_locked(void* t, grpc_error* error);
130 static void post_benign_reclaimer(grpc_chttp2_transport* t);
131 static void post_destructive_reclaimer(grpc_chttp2_transport* t);
133 static void close_transport_locked(grpc_chttp2_transport* t, grpc_error* error);
134 static void end_all_the_calls(grpc_chttp2_transport* t, grpc_error* error);
136 static void schedule_bdp_ping_locked(grpc_chttp2_transport* t);
137 static void start_bdp_ping_locked(void* tp, grpc_error* error);
138 static void finish_bdp_ping_locked(void* tp, grpc_error* error);
139 static void next_bdp_ping_timer_expired_locked(void* tp, grpc_error* error);
141 static void cancel_pings(grpc_chttp2_transport* t, grpc_error* error);
142 static void send_ping_locked(grpc_chttp2_transport* t,
143 grpc_closure* on_initiate,
144 grpc_closure* on_complete);
145 static void retry_initiate_ping_locked(void* tp, grpc_error* error);
147 /** keepalive-relevant functions */
148 static void init_keepalive_ping_locked(void* arg, grpc_error* error);
149 static void start_keepalive_ping_locked(void* arg, grpc_error* error);
150 static void finish_keepalive_ping_locked(void* arg, grpc_error* error);
151 static void keepalive_watchdog_fired_locked(void* arg, grpc_error* error);
153 static void reset_byte_stream(void* arg, grpc_error* error);
155 // Flow control default enabled. Can be disabled by setting
156 // GRPC_EXPERIMENTAL_DISABLE_FLOW_CONTROL
157 bool g_flow_control_enabled = true;
159 /*******************************************************************************
160 * CONSTRUCTION/DESTRUCTION/REFCOUNTING
163 grpc_chttp2_transport::~grpc_chttp2_transport() {
166 if (channelz_socket != nullptr) {
167 channelz_socket.reset();
170 grpc_endpoint_destroy(ep);
172 grpc_slice_buffer_destroy_internal(&qbuf);
174 grpc_slice_buffer_destroy_internal(&outbuf);
175 grpc_chttp2_hpack_compressor_destroy(&hpack_compressor);
178 GRPC_ERROR_CREATE_FROM_STATIC_STRING("Transport destroyed");
179 // ContextList::Execute follows semantics of a callback function and does not
180 // take a ref on error
181 grpc_core::ContextList::Execute(cl, nullptr, error);
182 GRPC_ERROR_UNREF(error);
185 grpc_slice_buffer_destroy_internal(&read_buffer);
186 grpc_chttp2_hpack_parser_destroy(&hpack_parser);
187 grpc_chttp2_goaway_parser_destroy(&goaway_parser);
189 for (i = 0; i < STREAM_LIST_COUNT; i++) {
190 GPR_ASSERT(lists[i].head == nullptr);
191 GPR_ASSERT(lists[i].tail == nullptr);
194 GRPC_ERROR_UNREF(goaway_error);
196 GPR_ASSERT(grpc_chttp2_stream_map_size(&stream_map) == 0);
198 grpc_chttp2_stream_map_destroy(&stream_map);
199 grpc_connectivity_state_destroy(&channel_callback.state_tracker);
201 GRPC_COMBINER_UNREF(combiner, "chttp2_transport");
204 GRPC_ERROR_CREATE_FROM_STATIC_STRING("Transport destroyed"));
206 while (write_cb_pool) {
207 grpc_chttp2_write_cb* next = write_cb_pool->next;
208 gpr_free(write_cb_pool);
209 write_cb_pool = next;
212 flow_control.Destroy();
214 GRPC_ERROR_UNREF(closed_with_error);
216 gpr_free(peer_string);
219 static const grpc_transport_vtable* get_vtable(void);
221 /* Returns whether bdp is enabled */
222 static bool read_channel_args(grpc_chttp2_transport* t,
223 const grpc_channel_args* channel_args,
225 bool enable_bdp = true;
226 bool channelz_enabled = GRPC_ENABLE_CHANNELZ_DEFAULT;
230 for (i = 0; i < channel_args->num_args; i++) {
231 if (0 == strcmp(channel_args->args[i].key,
232 GRPC_ARG_HTTP2_INITIAL_SEQUENCE_NUMBER)) {
233 const grpc_integer_options options = {-1, 0, INT_MAX};
235 grpc_channel_arg_get_integer(&channel_args->args[i], options);
237 if ((t->next_stream_id & 1) != (value & 1)) {
238 gpr_log(GPR_ERROR, "%s: low bit must be %d on %s",
239 GRPC_ARG_HTTP2_INITIAL_SEQUENCE_NUMBER, t->next_stream_id & 1,
240 is_client ? "client" : "server");
242 t->next_stream_id = static_cast<uint32_t>(value);
245 } else if (0 == strcmp(channel_args->args[i].key,
246 GRPC_ARG_HTTP2_HPACK_TABLE_SIZE_ENCODER)) {
247 const grpc_integer_options options = {-1, 0, INT_MAX};
249 grpc_channel_arg_get_integer(&channel_args->args[i], options);
251 grpc_chttp2_hpack_compressor_set_max_usable_size(
252 &t->hpack_compressor, static_cast<uint32_t>(value));
254 } else if (0 == strcmp(channel_args->args[i].key,
255 GRPC_ARG_HTTP2_MAX_PINGS_WITHOUT_DATA)) {
256 t->ping_policy.max_pings_without_data = grpc_channel_arg_get_integer(
257 &channel_args->args[i],
258 {g_default_max_pings_without_data, 0, INT_MAX});
259 } else if (0 == strcmp(channel_args->args[i].key,
260 GRPC_ARG_HTTP2_MAX_PING_STRIKES)) {
261 t->ping_policy.max_ping_strikes = grpc_channel_arg_get_integer(
262 &channel_args->args[i], {g_default_max_ping_strikes, 0, INT_MAX});
264 strcmp(channel_args->args[i].key,
265 GRPC_ARG_HTTP2_MIN_SENT_PING_INTERVAL_WITHOUT_DATA_MS)) {
266 t->ping_policy.min_sent_ping_interval_without_data =
267 grpc_channel_arg_get_integer(
268 &channel_args->args[i],
269 grpc_integer_options{
270 g_default_min_sent_ping_interval_without_data_ms, 0,
273 strcmp(channel_args->args[i].key,
274 GRPC_ARG_HTTP2_MIN_RECV_PING_INTERVAL_WITHOUT_DATA_MS)) {
275 t->ping_policy.min_recv_ping_interval_without_data =
276 grpc_channel_arg_get_integer(
277 &channel_args->args[i],
278 grpc_integer_options{
279 g_default_min_recv_ping_interval_without_data_ms, 0,
281 } else if (0 == strcmp(channel_args->args[i].key,
282 GRPC_ARG_HTTP2_WRITE_BUFFER_SIZE)) {
283 t->write_buffer_size = static_cast<uint32_t>(grpc_channel_arg_get_integer(
284 &channel_args->args[i], {0, 0, MAX_WRITE_BUFFER_SIZE}));
286 strcmp(channel_args->args[i].key, GRPC_ARG_HTTP2_BDP_PROBE)) {
287 enable_bdp = grpc_channel_arg_get_bool(&channel_args->args[i], true);
289 strcmp(channel_args->args[i].key, GRPC_ARG_KEEPALIVE_TIME_MS)) {
290 const int value = grpc_channel_arg_get_integer(
291 &channel_args->args[i],
292 grpc_integer_options{t->is_client
293 ? g_default_client_keepalive_time_ms
294 : g_default_server_keepalive_time_ms,
296 t->keepalive_time = value == INT_MAX ? GRPC_MILLIS_INF_FUTURE : value;
297 } else if (0 == strcmp(channel_args->args[i].key,
298 GRPC_ARG_KEEPALIVE_TIMEOUT_MS)) {
299 const int value = grpc_channel_arg_get_integer(
300 &channel_args->args[i],
301 grpc_integer_options{t->is_client
302 ? g_default_client_keepalive_timeout_ms
303 : g_default_server_keepalive_timeout_ms,
305 t->keepalive_timeout = value == INT_MAX ? GRPC_MILLIS_INF_FUTURE : value;
306 } else if (0 == strcmp(channel_args->args[i].key,
307 GRPC_ARG_KEEPALIVE_PERMIT_WITHOUT_CALLS)) {
308 t->keepalive_permit_without_calls = static_cast<uint32_t>(
309 grpc_channel_arg_get_integer(&channel_args->args[i], {0, 0, 1}));
310 } else if (0 == strcmp(channel_args->args[i].key,
311 GRPC_ARG_OPTIMIZATION_TARGET)) {
312 if (channel_args->args[i].type != GRPC_ARG_STRING) {
313 gpr_log(GPR_ERROR, "%s should be a string",
314 GRPC_ARG_OPTIMIZATION_TARGET);
315 } else if (0 == strcmp(channel_args->args[i].value.string, "blend")) {
316 t->opt_target = GRPC_CHTTP2_OPTIMIZE_FOR_LATENCY;
317 } else if (0 == strcmp(channel_args->args[i].value.string, "latency")) {
318 t->opt_target = GRPC_CHTTP2_OPTIMIZE_FOR_LATENCY;
320 strcmp(channel_args->args[i].value.string, "throughput")) {
321 t->opt_target = GRPC_CHTTP2_OPTIMIZE_FOR_THROUGHPUT;
323 gpr_log(GPR_ERROR, "%s value '%s' unknown, assuming 'blend'",
324 GRPC_ARG_OPTIMIZATION_TARGET,
325 channel_args->args[i].value.string);
328 strcmp(channel_args->args[i].key, GRPC_ARG_ENABLE_CHANNELZ)) {
329 channelz_enabled = grpc_channel_arg_get_bool(
330 &channel_args->args[i], GRPC_ENABLE_CHANNELZ_DEFAULT);
332 static const struct {
333 const char* channel_arg_name;
334 grpc_chttp2_setting_id setting_id;
335 grpc_integer_options integer_options;
336 bool availability[2] /* server, client */;
337 } settings_map[] = {{GRPC_ARG_MAX_CONCURRENT_STREAMS,
338 GRPC_CHTTP2_SETTINGS_MAX_CONCURRENT_STREAMS,
341 {GRPC_ARG_HTTP2_HPACK_TABLE_SIZE_DECODER,
342 GRPC_CHTTP2_SETTINGS_HEADER_TABLE_SIZE,
345 {GRPC_ARG_MAX_METADATA_SIZE,
346 GRPC_CHTTP2_SETTINGS_MAX_HEADER_LIST_SIZE,
349 {GRPC_ARG_HTTP2_MAX_FRAME_SIZE,
350 GRPC_CHTTP2_SETTINGS_MAX_FRAME_SIZE,
351 {-1, 16384, 16777215},
353 {GRPC_ARG_HTTP2_ENABLE_TRUE_BINARY,
354 GRPC_CHTTP2_SETTINGS_GRPC_ALLOW_TRUE_BINARY_METADATA,
357 {GRPC_ARG_HTTP2_STREAM_LOOKAHEAD_BYTES,
358 GRPC_CHTTP2_SETTINGS_INITIAL_WINDOW_SIZE,
361 for (j = 0; j < static_cast<int> GPR_ARRAY_SIZE(settings_map); j++) {
362 if (0 == strcmp(channel_args->args[i].key,
363 settings_map[j].channel_arg_name)) {
364 if (!settings_map[j].availability[is_client]) {
365 gpr_log(GPR_DEBUG, "%s is not available on %s",
366 settings_map[j].channel_arg_name,
367 is_client ? "clients" : "servers");
369 int value = grpc_channel_arg_get_integer(
370 &channel_args->args[i], settings_map[j].integer_options);
372 queue_setting_update(t, settings_map[j].setting_id,
373 static_cast<uint32_t>(value));
381 if (channelz_enabled) {
382 // TODO(ncteisen): add an API to endpoint to query for local addr, and pass
383 // it in here, so SocketNode knows its own address.
384 char* socket_name = nullptr;
385 gpr_asprintf(&socket_name, "%s %s", get_vtable()->name, t->peer_string);
387 grpc_core::MakeRefCounted<grpc_core::channelz::SocketNode>(
388 grpc_core::UniquePtr<char>(),
389 grpc_core::UniquePtr<char>(gpr_strdup(t->peer_string)),
390 grpc_core::UniquePtr<char>(socket_name));
395 static void init_transport_closures(grpc_chttp2_transport* t) {
396 GRPC_CLOSURE_INIT(&t->read_action_locked, read_action_locked, t,
397 grpc_combiner_scheduler(t->combiner));
398 GRPC_CLOSURE_INIT(&t->benign_reclaimer_locked, benign_reclaimer_locked, t,
399 grpc_combiner_scheduler(t->combiner));
400 GRPC_CLOSURE_INIT(&t->destructive_reclaimer_locked,
401 destructive_reclaimer_locked, t,
402 grpc_combiner_scheduler(t->combiner));
403 GRPC_CLOSURE_INIT(&t->retry_initiate_ping_locked, retry_initiate_ping_locked,
404 t, grpc_combiner_scheduler(t->combiner));
405 GRPC_CLOSURE_INIT(&t->start_bdp_ping_locked, start_bdp_ping_locked, t,
406 grpc_combiner_scheduler(t->combiner));
407 GRPC_CLOSURE_INIT(&t->finish_bdp_ping_locked, finish_bdp_ping_locked, t,
408 grpc_combiner_scheduler(t->combiner));
409 GRPC_CLOSURE_INIT(&t->next_bdp_ping_timer_expired_locked,
410 next_bdp_ping_timer_expired_locked, t,
411 grpc_combiner_scheduler(t->combiner));
412 GRPC_CLOSURE_INIT(&t->init_keepalive_ping_locked, init_keepalive_ping_locked,
413 t, grpc_combiner_scheduler(t->combiner));
414 GRPC_CLOSURE_INIT(&t->start_keepalive_ping_locked,
415 start_keepalive_ping_locked, t,
416 grpc_combiner_scheduler(t->combiner));
417 GRPC_CLOSURE_INIT(&t->finish_keepalive_ping_locked,
418 finish_keepalive_ping_locked, t,
419 grpc_combiner_scheduler(t->combiner));
420 GRPC_CLOSURE_INIT(&t->keepalive_watchdog_fired_locked,
421 keepalive_watchdog_fired_locked, t,
422 grpc_combiner_scheduler(t->combiner));
425 static void init_transport_keepalive_settings(grpc_chttp2_transport* t) {
427 t->keepalive_time = g_default_client_keepalive_time_ms == INT_MAX
428 ? GRPC_MILLIS_INF_FUTURE
429 : g_default_client_keepalive_time_ms;
430 t->keepalive_timeout = g_default_client_keepalive_timeout_ms == INT_MAX
431 ? GRPC_MILLIS_INF_FUTURE
432 : g_default_client_keepalive_timeout_ms;
433 t->keepalive_permit_without_calls =
434 g_default_client_keepalive_permit_without_calls;
436 t->keepalive_time = g_default_server_keepalive_time_ms == INT_MAX
437 ? GRPC_MILLIS_INF_FUTURE
438 : g_default_server_keepalive_time_ms;
439 t->keepalive_timeout = g_default_server_keepalive_timeout_ms == INT_MAX
440 ? GRPC_MILLIS_INF_FUTURE
441 : g_default_server_keepalive_timeout_ms;
442 t->keepalive_permit_without_calls =
443 g_default_server_keepalive_permit_without_calls;
447 static void configure_transport_ping_policy(grpc_chttp2_transport* t) {
448 t->ping_policy.max_pings_without_data = g_default_max_pings_without_data;
449 t->ping_policy.min_sent_ping_interval_without_data =
450 g_default_min_sent_ping_interval_without_data_ms;
451 t->ping_policy.max_ping_strikes = g_default_max_ping_strikes;
452 t->ping_policy.min_recv_ping_interval_without_data =
453 g_default_min_recv_ping_interval_without_data_ms;
456 static void init_keepalive_pings_if_enabled(grpc_chttp2_transport* t) {
457 if (t->keepalive_time != GRPC_MILLIS_INF_FUTURE) {
458 t->keepalive_state = GRPC_CHTTP2_KEEPALIVE_STATE_WAITING;
459 GRPC_CHTTP2_REF_TRANSPORT(t, "init keepalive ping");
460 grpc_timer_init(&t->keepalive_ping_timer,
461 grpc_core::ExecCtx::Get()->Now() + t->keepalive_time,
462 &t->init_keepalive_ping_locked);
464 /* Use GRPC_CHTTP2_KEEPALIVE_STATE_DISABLED to indicate there are no
465 inflight keeaplive timers */
466 t->keepalive_state = GRPC_CHTTP2_KEEPALIVE_STATE_DISABLED;
470 grpc_chttp2_transport::grpc_chttp2_transport(
471 const grpc_channel_args* channel_args, grpc_endpoint* ep, bool is_client,
472 grpc_resource_user* resource_user)
473 : refs(1, &grpc_trace_chttp2_refcount),
475 peer_string(grpc_endpoint_get_peer(ep)),
476 resource_user(resource_user),
477 combiner(grpc_combiner_create()),
478 is_client(is_client),
479 next_stream_id(is_client ? 1 : 2),
480 deframe_state(is_client ? GRPC_DTS_FH_0 : GRPC_DTS_CLIENT_PREFIX_0) {
481 GPR_ASSERT(strlen(GRPC_CHTTP2_CLIENT_CONNECT_STRING) ==
482 GRPC_CHTTP2_CLIENT_CONNECT_STRLEN);
483 base.vtable = get_vtable();
484 /* 8 is a random stab in the dark as to a good initial size: it's small enough
485 that it shouldn't waste memory for infrequently used connections, yet
486 large enough that the exponential growth should happen nicely when it's
488 TODO(ctiller): tune this */
489 grpc_chttp2_stream_map_init(&stream_map, 8);
491 grpc_slice_buffer_init(&read_buffer);
492 grpc_connectivity_state_init(
493 &channel_callback.state_tracker, GRPC_CHANNEL_READY,
494 is_client ? "client_transport" : "server_transport");
495 grpc_slice_buffer_init(&outbuf);
497 grpc_slice_buffer_add(&outbuf, grpc_slice_from_copied_string(
498 GRPC_CHTTP2_CLIENT_CONNECT_STRING));
500 grpc_chttp2_hpack_compressor_init(&hpack_compressor);
501 grpc_slice_buffer_init(&qbuf);
502 /* copy in initial settings to all setting sets */
505 for (i = 0; i < GRPC_CHTTP2_NUM_SETTINGS; i++) {
506 for (j = 0; j < GRPC_NUM_SETTING_SETS; j++) {
507 settings[j][i] = grpc_chttp2_settings_parameters[i].default_value;
510 grpc_chttp2_hpack_parser_init(&hpack_parser);
511 grpc_chttp2_goaway_parser_init(&goaway_parser);
513 init_transport_closures(this);
515 /* configure http2 the way we like it */
517 queue_setting_update(this, GRPC_CHTTP2_SETTINGS_ENABLE_PUSH, 0);
518 queue_setting_update(this, GRPC_CHTTP2_SETTINGS_MAX_CONCURRENT_STREAMS, 0);
520 queue_setting_update(this, GRPC_CHTTP2_SETTINGS_MAX_HEADER_LIST_SIZE,
521 DEFAULT_MAX_HEADER_LIST_SIZE);
522 queue_setting_update(this,
523 GRPC_CHTTP2_SETTINGS_GRPC_ALLOW_TRUE_BINARY_METADATA, 1);
525 configure_transport_ping_policy(this);
526 init_transport_keepalive_settings(this);
528 bool enable_bdp = true;
530 enable_bdp = read_channel_args(this, channel_args, is_client);
533 if (g_flow_control_enabled) {
534 flow_control.Init<grpc_core::chttp2::TransportFlowControl>(this,
537 flow_control.Init<grpc_core::chttp2::TransportFlowControlDisabled>(this);
541 /* No pings allowed before receiving a header or data frame. */
542 ping_state.pings_before_data_required = 0;
543 ping_state.is_delayed_ping_timer_set = false;
544 ping_state.last_ping_sent_time = GRPC_MILLIS_INF_PAST;
546 ping_recv_state.last_ping_recv_time = GRPC_MILLIS_INF_PAST;
547 ping_recv_state.ping_strikes = 0;
549 init_keepalive_pings_if_enabled(this);
552 GRPC_CHTTP2_REF_TRANSPORT(this, "bdp_ping");
553 schedule_bdp_ping_locked(this);
554 grpc_chttp2_act_on_flowctl_action(flow_control->PeriodicUpdate(), this,
558 grpc_chttp2_initiate_write(this, GRPC_CHTTP2_INITIATE_WRITE_INITIAL_WRITE);
559 post_benign_reclaimer(this);
562 static void destroy_transport_locked(void* tp, grpc_error* error) {
563 grpc_chttp2_transport* t = static_cast<grpc_chttp2_transport*>(tp);
565 close_transport_locked(
566 t, grpc_error_set_int(
567 GRPC_ERROR_CREATE_FROM_STATIC_STRING("Transport destroyed"),
568 GRPC_ERROR_INT_OCCURRED_DURING_WRITE, t->write_state));
569 // Must be the last line.
570 GRPC_CHTTP2_UNREF_TRANSPORT(t, "destroy");
573 static void destroy_transport(grpc_transport* gt) {
574 grpc_chttp2_transport* t = reinterpret_cast<grpc_chttp2_transport*>(gt);
575 GRPC_CLOSURE_SCHED(GRPC_CLOSURE_CREATE(destroy_transport_locked, t,
576 grpc_combiner_scheduler(t->combiner)),
580 static void close_transport_locked(grpc_chttp2_transport* t,
582 end_all_the_calls(t, GRPC_ERROR_REF(error));
583 cancel_pings(t, GRPC_ERROR_REF(error));
584 if (t->closed_with_error == GRPC_ERROR_NONE) {
585 if (!grpc_error_has_clear_grpc_status(error)) {
586 error = grpc_error_set_int(error, GRPC_ERROR_INT_GRPC_STATUS,
587 GRPC_STATUS_UNAVAILABLE);
589 if (t->write_state != GRPC_CHTTP2_WRITE_STATE_IDLE) {
590 if (t->close_transport_on_writes_finished == nullptr) {
591 t->close_transport_on_writes_finished =
592 GRPC_ERROR_CREATE_FROM_STATIC_STRING(
593 "Delayed close due to in-progress write");
595 t->close_transport_on_writes_finished =
596 grpc_error_add_child(t->close_transport_on_writes_finished, error);
599 GPR_ASSERT(error != GRPC_ERROR_NONE);
600 t->closed_with_error = GRPC_ERROR_REF(error);
601 connectivity_state_set(t, GRPC_CHANNEL_SHUTDOWN, "close_transport");
602 if (t->ping_state.is_delayed_ping_timer_set) {
603 grpc_timer_cancel(&t->ping_state.delayed_ping_timer);
605 if (t->have_next_bdp_ping_timer) {
606 grpc_timer_cancel(&t->next_bdp_ping_timer);
608 switch (t->keepalive_state) {
609 case GRPC_CHTTP2_KEEPALIVE_STATE_WAITING:
610 grpc_timer_cancel(&t->keepalive_ping_timer);
612 case GRPC_CHTTP2_KEEPALIVE_STATE_PINGING:
613 grpc_timer_cancel(&t->keepalive_ping_timer);
614 grpc_timer_cancel(&t->keepalive_watchdog_timer);
616 case GRPC_CHTTP2_KEEPALIVE_STATE_DYING:
617 case GRPC_CHTTP2_KEEPALIVE_STATE_DISABLED:
618 /* keepalive timers are not set in these two states */
622 /* flush writable stream list to avoid dangling references */
623 grpc_chttp2_stream* s;
624 while (grpc_chttp2_list_pop_writable_stream(t, &s)) {
625 GRPC_CHTTP2_STREAM_UNREF(s, "chttp2_writing:close");
627 GPR_ASSERT(t->write_state == GRPC_CHTTP2_WRITE_STATE_IDLE);
628 grpc_endpoint_shutdown(t->ep, GRPC_ERROR_REF(error));
630 if (t->notify_on_receive_settings != nullptr) {
631 GRPC_CLOSURE_SCHED(t->notify_on_receive_settings, GRPC_ERROR_CANCELLED);
632 t->notify_on_receive_settings = nullptr;
634 GRPC_ERROR_UNREF(error);
638 void grpc_chttp2_stream_ref(grpc_chttp2_stream* s, const char* reason) {
639 grpc_stream_ref(s->refcount, reason);
641 void grpc_chttp2_stream_unref(grpc_chttp2_stream* s, const char* reason) {
642 grpc_stream_unref(s->refcount, reason);
645 void grpc_chttp2_stream_ref(grpc_chttp2_stream* s) {
646 grpc_stream_ref(s->refcount);
648 void grpc_chttp2_stream_unref(grpc_chttp2_stream* s) {
649 grpc_stream_unref(s->refcount);
653 grpc_chttp2_stream::Reffer::Reffer(grpc_chttp2_stream* s) {
654 /* We reserve one 'active stream' that's dropped when the stream is
655 read-closed. The others are for Chttp2IncomingByteStreams that are
657 GRPC_CHTTP2_STREAM_REF(s, "chttp2");
658 GRPC_CHTTP2_REF_TRANSPORT(s->t, "stream");
661 grpc_chttp2_stream::grpc_chttp2_stream(grpc_chttp2_transport* t,
662 grpc_stream_refcount* refcount,
663 const void* server_data,
664 grpc_core::Arena* arena)
668 metadata_buffer{grpc_chttp2_incoming_metadata_buffer(arena),
669 grpc_chttp2_incoming_metadata_buffer(arena)} {
671 id = static_cast<uint32_t>((uintptr_t)server_data);
672 *t->accepting_stream = this;
673 grpc_chttp2_stream_map_add(&t->stream_map, id, this);
674 post_destructive_reclaimer(t);
676 if (t->flow_control->flow_control_enabled()) {
677 flow_control.Init<grpc_core::chttp2::StreamFlowControl>(
678 static_cast<grpc_core::chttp2::TransportFlowControl*>(
679 t->flow_control.get()),
682 flow_control.Init<grpc_core::chttp2::StreamFlowControlDisabled>();
685 grpc_slice_buffer_init(&frame_storage);
686 grpc_slice_buffer_init(&unprocessed_incoming_frames_buffer);
687 grpc_slice_buffer_init(&flow_controlled_buffer);
689 GRPC_CLOSURE_INIT(&complete_fetch_locked, ::complete_fetch_locked, this,
690 grpc_combiner_scheduler(t->combiner));
691 GRPC_CLOSURE_INIT(&reset_byte_stream, ::reset_byte_stream, this,
692 grpc_combiner_scheduler(t->combiner));
695 grpc_chttp2_stream::~grpc_chttp2_stream() {
696 if (t->channelz_socket != nullptr) {
697 if ((t->is_client && eos_received) || (!t->is_client && eos_sent)) {
698 t->channelz_socket->RecordStreamSucceeded();
700 t->channelz_socket->RecordStreamFailed();
704 GPR_ASSERT((write_closed && read_closed) || id == 0);
706 GPR_ASSERT(grpc_chttp2_stream_map_find(&t->stream_map, id) == nullptr);
709 grpc_slice_buffer_destroy_internal(&unprocessed_incoming_frames_buffer);
710 grpc_slice_buffer_destroy_internal(&frame_storage);
711 if (stream_compression_method != GRPC_STREAM_COMPRESSION_IDENTITY_COMPRESS) {
712 grpc_slice_buffer_destroy_internal(&compressed_data_buffer);
714 if (stream_decompression_method !=
715 GRPC_STREAM_COMPRESSION_IDENTITY_DECOMPRESS) {
716 grpc_slice_buffer_destroy_internal(&decompressed_data_buffer);
719 grpc_chttp2_list_remove_stalled_by_transport(t, this);
720 grpc_chttp2_list_remove_stalled_by_stream(t, this);
722 for (int i = 0; i < STREAM_LIST_COUNT; i++) {
723 if (GPR_UNLIKELY(included[i])) {
724 gpr_log(GPR_ERROR, "%s stream %d still included in list %d",
725 t->is_client ? "client" : "server", id, i);
730 GPR_ASSERT(send_initial_metadata_finished == nullptr);
731 GPR_ASSERT(fetching_send_message == nullptr);
732 GPR_ASSERT(send_trailing_metadata_finished == nullptr);
733 GPR_ASSERT(recv_initial_metadata_ready == nullptr);
734 GPR_ASSERT(recv_message_ready == nullptr);
735 GPR_ASSERT(recv_trailing_metadata_finished == nullptr);
736 grpc_slice_buffer_destroy_internal(&flow_controlled_buffer);
737 GRPC_ERROR_UNREF(read_closed_error);
738 GRPC_ERROR_UNREF(write_closed_error);
739 GRPC_ERROR_UNREF(byte_stream_error);
741 flow_control.Destroy();
743 if (t->resource_user != nullptr) {
744 grpc_resource_user_free(t->resource_user, GRPC_RESOURCE_QUOTA_CALL_SIZE);
747 GRPC_CHTTP2_UNREF_TRANSPORT(t, "stream");
748 GRPC_CLOSURE_SCHED(destroy_stream_arg, GRPC_ERROR_NONE);
751 static int init_stream(grpc_transport* gt, grpc_stream* gs,
752 grpc_stream_refcount* refcount, const void* server_data,
753 grpc_core::Arena* arena) {
754 GPR_TIMER_SCOPE("init_stream", 0);
755 grpc_chttp2_transport* t = reinterpret_cast<grpc_chttp2_transport*>(gt);
756 new (gs) grpc_chttp2_stream(t, refcount, server_data, arena);
760 static void destroy_stream_locked(void* sp, grpc_error* error) {
761 GPR_TIMER_SCOPE("destroy_stream", 0);
762 grpc_chttp2_stream* s = static_cast<grpc_chttp2_stream*>(sp);
763 s->~grpc_chttp2_stream();
766 static void destroy_stream(grpc_transport* gt, grpc_stream* gs,
767 grpc_closure* then_schedule_closure) {
768 GPR_TIMER_SCOPE("destroy_stream", 0);
769 grpc_chttp2_transport* t = reinterpret_cast<grpc_chttp2_transport*>(gt);
770 grpc_chttp2_stream* s = reinterpret_cast<grpc_chttp2_stream*>(gs);
771 if (s->stream_compression_method !=
772 GRPC_STREAM_COMPRESSION_IDENTITY_COMPRESS &&
773 s->stream_compression_ctx != nullptr) {
774 grpc_stream_compression_context_destroy(s->stream_compression_ctx);
775 s->stream_compression_ctx = nullptr;
777 if (s->stream_decompression_method !=
778 GRPC_STREAM_COMPRESSION_IDENTITY_DECOMPRESS &&
779 s->stream_decompression_ctx != nullptr) {
780 grpc_stream_compression_context_destroy(s->stream_decompression_ctx);
781 s->stream_decompression_ctx = nullptr;
784 s->destroy_stream_arg = then_schedule_closure;
786 GRPC_CLOSURE_INIT(&s->destroy_stream, destroy_stream_locked, s,
787 grpc_combiner_scheduler(t->combiner)),
791 grpc_chttp2_stream* grpc_chttp2_parsing_accept_stream(grpc_chttp2_transport* t,
793 if (t->channel_callback.accept_stream == nullptr) {
796 // Don't accept the stream if memory quota doesn't allow. Note that we should
797 // simply refuse the stream here instead of canceling the stream after it's
798 // accepted since the latter will create the call which costs much memory.
799 if (t->resource_user != nullptr &&
800 !grpc_resource_user_safe_alloc(t->resource_user,
801 GRPC_RESOURCE_QUOTA_CALL_SIZE)) {
802 gpr_log(GPR_ERROR, "Memory exhausted, rejecting the stream.");
803 grpc_chttp2_add_rst_stream_to_next_write(t, id, GRPC_HTTP2_REFUSED_STREAM,
805 grpc_chttp2_initiate_write(t, GRPC_CHTTP2_INITIATE_WRITE_RST_STREAM);
808 grpc_chttp2_stream* accepting = nullptr;
809 GPR_ASSERT(t->accepting_stream == nullptr);
810 t->accepting_stream = &accepting;
811 t->channel_callback.accept_stream(t->channel_callback.accept_stream_user_data,
813 (void*)static_cast<uintptr_t>(id));
814 t->accepting_stream = nullptr;
818 /*******************************************************************************
822 static const char* write_state_name(grpc_chttp2_write_state st) {
824 case GRPC_CHTTP2_WRITE_STATE_IDLE:
826 case GRPC_CHTTP2_WRITE_STATE_WRITING:
828 case GRPC_CHTTP2_WRITE_STATE_WRITING_WITH_MORE:
829 return "WRITING+MORE";
831 GPR_UNREACHABLE_CODE(return "UNKNOWN");
834 static void set_write_state(grpc_chttp2_transport* t,
835 grpc_chttp2_write_state st, const char* reason) {
836 GRPC_CHTTP2_IF_TRACING(
837 gpr_log(GPR_INFO, "W:%p %s [%s] state %s -> %s [%s]", t,
838 t->is_client ? "CLIENT" : "SERVER", t->peer_string,
839 write_state_name(t->write_state), write_state_name(st), reason));
841 /* If the state is being reset back to idle, it means a write was just
842 * finished. Make sure all the run_after_write closures are scheduled.
844 * This is also our chance to close the transport if the transport was marked
845 * to be closed after all writes finish (for example, if we received a go-away
846 * from peer while we had some pending writes) */
847 if (st == GRPC_CHTTP2_WRITE_STATE_IDLE) {
848 GRPC_CLOSURE_LIST_SCHED(&t->run_after_write);
849 if (t->close_transport_on_writes_finished != nullptr) {
850 grpc_error* err = t->close_transport_on_writes_finished;
851 t->close_transport_on_writes_finished = nullptr;
852 close_transport_locked(t, err);
857 static void inc_initiate_write_reason(
858 grpc_chttp2_initiate_write_reason reason) {
860 case GRPC_CHTTP2_INITIATE_WRITE_INITIAL_WRITE:
861 GRPC_STATS_INC_HTTP2_INITIATE_WRITE_DUE_TO_INITIAL_WRITE();
863 case GRPC_CHTTP2_INITIATE_WRITE_START_NEW_STREAM:
864 GRPC_STATS_INC_HTTP2_INITIATE_WRITE_DUE_TO_START_NEW_STREAM();
866 case GRPC_CHTTP2_INITIATE_WRITE_SEND_MESSAGE:
867 GRPC_STATS_INC_HTTP2_INITIATE_WRITE_DUE_TO_SEND_MESSAGE();
869 case GRPC_CHTTP2_INITIATE_WRITE_SEND_INITIAL_METADATA:
870 GRPC_STATS_INC_HTTP2_INITIATE_WRITE_DUE_TO_SEND_INITIAL_METADATA();
872 case GRPC_CHTTP2_INITIATE_WRITE_SEND_TRAILING_METADATA:
873 GRPC_STATS_INC_HTTP2_INITIATE_WRITE_DUE_TO_SEND_TRAILING_METADATA();
875 case GRPC_CHTTP2_INITIATE_WRITE_RETRY_SEND_PING:
876 GRPC_STATS_INC_HTTP2_INITIATE_WRITE_DUE_TO_RETRY_SEND_PING();
878 case GRPC_CHTTP2_INITIATE_WRITE_CONTINUE_PINGS:
879 GRPC_STATS_INC_HTTP2_INITIATE_WRITE_DUE_TO_CONTINUE_PINGS();
881 case GRPC_CHTTP2_INITIATE_WRITE_GOAWAY_SENT:
882 GRPC_STATS_INC_HTTP2_INITIATE_WRITE_DUE_TO_GOAWAY_SENT();
884 case GRPC_CHTTP2_INITIATE_WRITE_RST_STREAM:
885 GRPC_STATS_INC_HTTP2_INITIATE_WRITE_DUE_TO_RST_STREAM();
887 case GRPC_CHTTP2_INITIATE_WRITE_CLOSE_FROM_API:
888 GRPC_STATS_INC_HTTP2_INITIATE_WRITE_DUE_TO_CLOSE_FROM_API();
890 case GRPC_CHTTP2_INITIATE_WRITE_STREAM_FLOW_CONTROL:
891 GRPC_STATS_INC_HTTP2_INITIATE_WRITE_DUE_TO_STREAM_FLOW_CONTROL();
893 case GRPC_CHTTP2_INITIATE_WRITE_TRANSPORT_FLOW_CONTROL:
894 GRPC_STATS_INC_HTTP2_INITIATE_WRITE_DUE_TO_TRANSPORT_FLOW_CONTROL();
896 case GRPC_CHTTP2_INITIATE_WRITE_SEND_SETTINGS:
897 GRPC_STATS_INC_HTTP2_INITIATE_WRITE_DUE_TO_SEND_SETTINGS();
899 case GRPC_CHTTP2_INITIATE_WRITE_FLOW_CONTROL_UNSTALLED_BY_SETTING:
900 GRPC_STATS_INC_HTTP2_INITIATE_WRITE_DUE_TO_FLOW_CONTROL_UNSTALLED_BY_SETTING();
902 case GRPC_CHTTP2_INITIATE_WRITE_FLOW_CONTROL_UNSTALLED_BY_UPDATE:
903 GRPC_STATS_INC_HTTP2_INITIATE_WRITE_DUE_TO_FLOW_CONTROL_UNSTALLED_BY_UPDATE();
905 case GRPC_CHTTP2_INITIATE_WRITE_APPLICATION_PING:
906 GRPC_STATS_INC_HTTP2_INITIATE_WRITE_DUE_TO_APPLICATION_PING();
908 case GRPC_CHTTP2_INITIATE_WRITE_KEEPALIVE_PING:
909 GRPC_STATS_INC_HTTP2_INITIATE_WRITE_DUE_TO_KEEPALIVE_PING();
911 case GRPC_CHTTP2_INITIATE_WRITE_TRANSPORT_FLOW_CONTROL_UNSTALLED:
912 GRPC_STATS_INC_HTTP2_INITIATE_WRITE_DUE_TO_TRANSPORT_FLOW_CONTROL_UNSTALLED();
914 case GRPC_CHTTP2_INITIATE_WRITE_PING_RESPONSE:
915 GRPC_STATS_INC_HTTP2_INITIATE_WRITE_DUE_TO_PING_RESPONSE();
917 case GRPC_CHTTP2_INITIATE_WRITE_FORCE_RST_STREAM:
918 GRPC_STATS_INC_HTTP2_INITIATE_WRITE_DUE_TO_FORCE_RST_STREAM();
923 void grpc_chttp2_initiate_write(grpc_chttp2_transport* t,
924 grpc_chttp2_initiate_write_reason reason) {
925 GPR_TIMER_SCOPE("grpc_chttp2_initiate_write", 0);
927 switch (t->write_state) {
928 case GRPC_CHTTP2_WRITE_STATE_IDLE:
929 inc_initiate_write_reason(reason);
930 set_write_state(t, GRPC_CHTTP2_WRITE_STATE_WRITING,
931 grpc_chttp2_initiate_write_reason_string(reason));
932 t->is_first_write_in_batch = true;
933 GRPC_CHTTP2_REF_TRANSPORT(t, "writing");
934 /* Note that the 'write_action_begin_locked' closure is being scheduled
935 * on the 'finally_scheduler' of t->combiner. This means that
936 * 'write_action_begin_locked' is called only *after* all the other
937 * closures (some of which are potentially initiating more writes on the
938 * transport) are executed on the t->combiner.
940 * The reason for scheduling on finally_scheduler is to make sure we batch
941 * as many writes as possible. 'write_action_begin_locked' is the function
942 * that gathers all the relevant bytes (which are at various places in the
943 * grpc_chttp2_transport structure) and append them to 'outbuf' field in
944 * grpc_chttp2_transport thereby batching what would have been potentially
945 * multiple write operations.
947 * Also, 'write_action_begin_locked' only gathers the bytes into outbuf.
948 * It does not call the endpoint to write the bytes. That is done by the
949 * 'write_action' (which is scheduled by 'write_action_begin_locked') */
951 GRPC_CLOSURE_INIT(&t->write_action_begin_locked,
952 write_action_begin_locked, t,
953 grpc_combiner_finally_scheduler(t->combiner)),
956 case GRPC_CHTTP2_WRITE_STATE_WRITING:
957 set_write_state(t, GRPC_CHTTP2_WRITE_STATE_WRITING_WITH_MORE,
958 grpc_chttp2_initiate_write_reason_string(reason));
960 case GRPC_CHTTP2_WRITE_STATE_WRITING_WITH_MORE:
965 void grpc_chttp2_mark_stream_writable(grpc_chttp2_transport* t,
966 grpc_chttp2_stream* s) {
967 if (t->closed_with_error == GRPC_ERROR_NONE &&
968 grpc_chttp2_list_add_writable_stream(t, s)) {
969 GRPC_CHTTP2_STREAM_REF(s, "chttp2_writing:become");
973 static grpc_closure_scheduler* write_scheduler(grpc_chttp2_transport* t,
974 bool early_results_scheduled,
975 bool partial_write) {
976 // If we're already in a background poller, don't offload this to an executor
977 if (grpc_iomgr_is_any_background_poller_thread()) {
978 return grpc_schedule_on_exec_ctx;
980 /* if it's not the first write in a batch, always offload to the executor:
981 we'll probably end up queuing against the kernel anyway, so we'll likely
982 get better latency overall if we switch writing work elsewhere and continue
983 with application work above */
984 if (!t->is_first_write_in_batch) {
985 return grpc_core::Executor::Scheduler(grpc_core::ExecutorJobType::SHORT);
987 /* equivalently, if it's a partial write, we *know* we're going to be taking a
988 thread jump to write it because of the above, may as well do so
991 return grpc_core::Executor::Scheduler(grpc_core::ExecutorJobType::SHORT);
993 switch (t->opt_target) {
994 case GRPC_CHTTP2_OPTIMIZE_FOR_THROUGHPUT:
995 /* executor gives us the largest probability of being able to batch a
996 * write with others on this transport */
997 return grpc_core::Executor::Scheduler(grpc_core::ExecutorJobType::SHORT);
998 case GRPC_CHTTP2_OPTIMIZE_FOR_LATENCY:
999 return grpc_schedule_on_exec_ctx;
1001 GPR_UNREACHABLE_CODE(return nullptr);
1004 #define WRITE_STATE_TUPLE_TO_INT(p, i) (2 * (int)(p) + (int)(i))
1005 static const char* begin_writing_desc(bool partial, bool inlined) {
1006 switch (WRITE_STATE_TUPLE_TO_INT(partial, inlined)) {
1007 case WRITE_STATE_TUPLE_TO_INT(false, false):
1008 return "begin write in background";
1009 case WRITE_STATE_TUPLE_TO_INT(false, true):
1010 return "begin write in current thread";
1011 case WRITE_STATE_TUPLE_TO_INT(true, false):
1012 return "begin partial write in background";
1013 case WRITE_STATE_TUPLE_TO_INT(true, true):
1014 return "begin partial write in current thread";
1016 GPR_UNREACHABLE_CODE(return "bad state tuple");
1019 static void write_action_begin_locked(void* gt, grpc_error* error_ignored) {
1020 GPR_TIMER_SCOPE("write_action_begin_locked", 0);
1021 grpc_chttp2_transport* t = static_cast<grpc_chttp2_transport*>(gt);
1022 GPR_ASSERT(t->write_state != GRPC_CHTTP2_WRITE_STATE_IDLE);
1023 grpc_chttp2_begin_write_result r;
1024 if (t->closed_with_error != GRPC_ERROR_NONE) {
1027 r = grpc_chttp2_begin_write(t);
1031 GRPC_STATS_INC_HTTP2_PARTIAL_WRITES();
1033 if (!t->is_first_write_in_batch) {
1034 GRPC_STATS_INC_HTTP2_WRITES_CONTINUED();
1036 grpc_closure_scheduler* scheduler =
1037 write_scheduler(t, r.early_results_scheduled, r.partial);
1038 if (scheduler != grpc_schedule_on_exec_ctx) {
1039 GRPC_STATS_INC_HTTP2_WRITES_OFFLOADED();
1043 r.partial ? GRPC_CHTTP2_WRITE_STATE_WRITING_WITH_MORE
1044 : GRPC_CHTTP2_WRITE_STATE_WRITING,
1045 begin_writing_desc(r.partial, scheduler == grpc_schedule_on_exec_ctx));
1047 GRPC_CLOSURE_INIT(&t->write_action, write_action, t, scheduler),
1049 if (t->reading_paused_on_pending_induced_frames) {
1050 GPR_ASSERT(t->num_pending_induced_frames == 0);
1051 /* We had paused reading, because we had many induced frames (SETTINGS
1052 * ACK, PINGS ACK and RST_STREAMS) pending in t->qbuf. Now that we have
1053 * been able to flush qbuf, we can resume reading. */
1054 GRPC_CHTTP2_IF_TRACING(gpr_log(
1056 "transport %p : Resuming reading after being paused due to too "
1057 "many unwritten SETTINGS ACK, PINGS ACK and RST_STREAM frames",
1059 t->reading_paused_on_pending_induced_frames = false;
1060 continue_read_action_locked(t);
1063 GRPC_STATS_INC_HTTP2_SPURIOUS_WRITES_BEGUN();
1064 set_write_state(t, GRPC_CHTTP2_WRITE_STATE_IDLE, "begin writing nothing");
1065 GRPC_CHTTP2_UNREF_TRANSPORT(t, "writing");
1069 static void write_action(void* gt, grpc_error* error) {
1070 GPR_TIMER_SCOPE("write_action", 0);
1071 grpc_chttp2_transport* t = static_cast<grpc_chttp2_transport*>(gt);
1074 grpc_endpoint_write(
1076 GRPC_CLOSURE_INIT(&t->write_action_end_locked, write_action_end_locked, t,
1077 grpc_combiner_scheduler(t->combiner)),
1081 /* Callback from the grpc_endpoint after bytes have been written by calling
1083 static void write_action_end_locked(void* tp, grpc_error* error) {
1084 GPR_TIMER_SCOPE("terminate_writing_with_lock", 0);
1085 grpc_chttp2_transport* t = static_cast<grpc_chttp2_transport*>(tp);
1087 bool closed = false;
1088 if (error != GRPC_ERROR_NONE) {
1089 close_transport_locked(t, GRPC_ERROR_REF(error));
1093 if (t->sent_goaway_state == GRPC_CHTTP2_GOAWAY_SEND_SCHEDULED) {
1094 t->sent_goaway_state = GRPC_CHTTP2_GOAWAY_SENT;
1096 if (grpc_chttp2_stream_map_size(&t->stream_map) == 0) {
1097 close_transport_locked(
1098 t, GRPC_ERROR_CREATE_FROM_STATIC_STRING("goaway sent"));
1102 switch (t->write_state) {
1103 case GRPC_CHTTP2_WRITE_STATE_IDLE:
1104 GPR_UNREACHABLE_CODE(break);
1105 case GRPC_CHTTP2_WRITE_STATE_WRITING:
1106 GPR_TIMER_MARK("state=writing", 0);
1107 set_write_state(t, GRPC_CHTTP2_WRITE_STATE_IDLE, "finish writing");
1109 case GRPC_CHTTP2_WRITE_STATE_WRITING_WITH_MORE:
1110 GPR_TIMER_MARK("state=writing_stale_no_poller", 0);
1111 set_write_state(t, GRPC_CHTTP2_WRITE_STATE_WRITING, "continue writing");
1112 t->is_first_write_in_batch = false;
1113 GRPC_CHTTP2_REF_TRANSPORT(t, "writing");
1114 // If the transport is closed, we will retry writing on the endpoint
1115 // and next write may contain part of the currently serialized frames.
1116 // So, we should only call the run_after_write callbacks when the next
1117 // write finishes, or the callbacks will be invoked when the stream is
1120 GRPC_CLOSURE_LIST_SCHED(&t->run_after_write);
1123 GRPC_CLOSURE_INIT(&t->write_action_begin_locked,
1124 write_action_begin_locked, t,
1125 grpc_combiner_finally_scheduler(t->combiner)),
1130 grpc_chttp2_end_write(t, GRPC_ERROR_REF(error));
1131 GRPC_CHTTP2_UNREF_TRANSPORT(t, "writing");
1134 // Dirties an HTTP2 setting to be sent out next time a writing path occurs.
1135 // If the change needs to occur immediately, manually initiate a write.
1136 static void queue_setting_update(grpc_chttp2_transport* t,
1137 grpc_chttp2_setting_id id, uint32_t value) {
1138 const grpc_chttp2_setting_parameters* sp =
1139 &grpc_chttp2_settings_parameters[id];
1140 uint32_t use_value = GPR_CLAMP(value, sp->min_value, sp->max_value);
1141 if (use_value != value) {
1142 gpr_log(GPR_INFO, "Requested parameter %s clamped from %d to %d", sp->name,
1145 if (use_value != t->settings[GRPC_LOCAL_SETTINGS][id]) {
1146 t->settings[GRPC_LOCAL_SETTINGS][id] = use_value;
1147 t->dirtied_local_settings = 1;
1151 void grpc_chttp2_add_incoming_goaway(grpc_chttp2_transport* t,
1152 uint32_t goaway_error,
1153 uint32_t last_stream_id,
1154 const grpc_slice& goaway_text) {
1155 // Discard the error from a previous goaway frame (if any)
1156 if (t->goaway_error != GRPC_ERROR_NONE) {
1157 GRPC_ERROR_UNREF(t->goaway_error);
1159 t->goaway_error = grpc_error_set_str(
1162 GRPC_ERROR_CREATE_FROM_STATIC_STRING("GOAWAY received"),
1163 GRPC_ERROR_INT_HTTP2_ERROR, static_cast<intptr_t>(goaway_error)),
1164 GRPC_ERROR_INT_GRPC_STATUS, GRPC_STATUS_UNAVAILABLE),
1165 GRPC_ERROR_STR_RAW_BYTES, goaway_text);
1167 GRPC_CHTTP2_IF_TRACING(
1168 gpr_log(GPR_INFO, "transport %p got goaway with last stream id %d", t,
1170 /* We want to log this irrespective of whether http tracing is enabled if we
1171 * received a GOAWAY with a non NO_ERROR code. */
1172 if (goaway_error != GRPC_HTTP2_NO_ERROR) {
1173 gpr_log(GPR_INFO, "%s: Got goaway [%d] err=%s", t->peer_string,
1174 goaway_error, grpc_error_string(t->goaway_error));
1176 /* When a client receives a GOAWAY with error code ENHANCE_YOUR_CALM and debug
1177 * data equal to "too_many_pings", it should log the occurrence at a log level
1178 * that is enabled by default and double the configured KEEPALIVE_TIME used
1179 * for new connections on that channel. */
1180 if (GPR_UNLIKELY(t->is_client &&
1181 goaway_error == GRPC_HTTP2_ENHANCE_YOUR_CALM &&
1182 grpc_slice_str_cmp(goaway_text, "too_many_pings") == 0)) {
1184 "Received a GOAWAY with error code ENHANCE_YOUR_CALM and debug "
1185 "data equal to \"too_many_pings\"");
1186 double current_keepalive_time_ms = static_cast<double>(t->keepalive_time);
1188 current_keepalive_time_ms > INT_MAX / KEEPALIVE_TIME_BACKOFF_MULTIPLIER
1189 ? GRPC_MILLIS_INF_FUTURE
1190 : static_cast<grpc_millis>(current_keepalive_time_ms *
1191 KEEPALIVE_TIME_BACKOFF_MULTIPLIER);
1194 /* lie: use transient failure from the transport to indicate goaway has been
1196 connectivity_state_set(t, GRPC_CHANNEL_TRANSIENT_FAILURE, "got_goaway");
1199 static void maybe_start_some_streams(grpc_chttp2_transport* t) {
1200 grpc_chttp2_stream* s;
1201 /* cancel out streams that haven't yet started if we have received a GOAWAY */
1202 if (t->goaway_error != GRPC_ERROR_NONE) {
1203 while (grpc_chttp2_list_pop_waiting_for_concurrency(t, &s)) {
1204 grpc_chttp2_cancel_stream(
1207 GRPC_ERROR_CREATE_FROM_STATIC_STRING("GOAWAY received"),
1208 GRPC_ERROR_INT_GRPC_STATUS, GRPC_STATUS_UNAVAILABLE));
1212 /* start streams where we have free grpc_chttp2_stream ids and free
1214 while (t->next_stream_id <= MAX_CLIENT_STREAM_ID &&
1215 grpc_chttp2_stream_map_size(&t->stream_map) <
1216 t->settings[GRPC_PEER_SETTINGS]
1217 [GRPC_CHTTP2_SETTINGS_MAX_CONCURRENT_STREAMS] &&
1218 grpc_chttp2_list_pop_waiting_for_concurrency(t, &s)) {
1219 /* safe since we can't (legally) be parsing this stream yet */
1220 GRPC_CHTTP2_IF_TRACING(gpr_log(
1222 "HTTP:%s: Transport %p allocating new grpc_chttp2_stream %p to id %d",
1223 t->is_client ? "CLI" : "SVR", t, s, t->next_stream_id));
1225 GPR_ASSERT(s->id == 0);
1226 s->id = t->next_stream_id;
1227 t->next_stream_id += 2;
1229 if (t->next_stream_id >= MAX_CLIENT_STREAM_ID) {
1230 connectivity_state_set(t, GRPC_CHANNEL_TRANSIENT_FAILURE,
1231 "no_more_stream_ids");
1234 grpc_chttp2_stream_map_add(&t->stream_map, s->id, s);
1235 post_destructive_reclaimer(t);
1236 grpc_chttp2_mark_stream_writable(t, s);
1237 grpc_chttp2_initiate_write(t, GRPC_CHTTP2_INITIATE_WRITE_START_NEW_STREAM);
1239 /* cancel out streams that will never be started */
1240 if (t->next_stream_id >= MAX_CLIENT_STREAM_ID) {
1241 while (grpc_chttp2_list_pop_waiting_for_concurrency(t, &s)) {
1242 grpc_chttp2_cancel_stream(
1245 GRPC_ERROR_CREATE_FROM_STATIC_STRING("Stream IDs exhausted"),
1246 GRPC_ERROR_INT_GRPC_STATUS, GRPC_STATUS_UNAVAILABLE));
1251 /* Flag that this closure barrier may be covering a write in a pollset, and so
1252 we should not complete this closure until we can prove that the write got
1254 #define CLOSURE_BARRIER_MAY_COVER_WRITE (1 << 0)
1255 /* First bit of the reference count, stored in the high order bits (with the low
1256 bits being used for flags defined above) */
1257 #define CLOSURE_BARRIER_FIRST_REF_BIT (1 << 16)
1259 static grpc_closure* add_closure_barrier(grpc_closure* closure) {
1260 closure->next_data.scratch += CLOSURE_BARRIER_FIRST_REF_BIT;
1264 static void null_then_sched_closure(grpc_closure** closure) {
1265 grpc_closure* c = *closure;
1267 GRPC_CLOSURE_SCHED(c, GRPC_ERROR_NONE);
1270 void grpc_chttp2_complete_closure_step(grpc_chttp2_transport* t,
1271 grpc_chttp2_stream* s,
1272 grpc_closure** pclosure,
1273 grpc_error* error, const char* desc) {
1274 grpc_closure* closure = *pclosure;
1275 *pclosure = nullptr;
1276 if (closure == nullptr) {
1277 GRPC_ERROR_UNREF(error);
1280 closure->next_data.scratch -= CLOSURE_BARRIER_FIRST_REF_BIT;
1281 if (GRPC_TRACE_FLAG_ENABLED(grpc_http_trace)) {
1282 const char* errstr = grpc_error_string(error);
1285 "complete_closure_step: t=%p %p refs=%d flags=0x%04x desc=%s err=%s "
1288 static_cast<int>(closure->next_data.scratch /
1289 CLOSURE_BARRIER_FIRST_REF_BIT),
1290 static_cast<int>(closure->next_data.scratch %
1291 CLOSURE_BARRIER_FIRST_REF_BIT),
1292 desc, errstr, write_state_name(t->write_state));
1294 if (error != GRPC_ERROR_NONE) {
1295 if (closure->error_data.error == GRPC_ERROR_NONE) {
1296 closure->error_data.error = GRPC_ERROR_CREATE_FROM_STATIC_STRING(
1297 "Error in HTTP transport completing operation");
1298 closure->error_data.error = grpc_error_set_str(
1299 closure->error_data.error, GRPC_ERROR_STR_TARGET_ADDRESS,
1300 grpc_slice_from_copied_string(t->peer_string));
1302 closure->error_data.error =
1303 grpc_error_add_child(closure->error_data.error, error);
1305 if (closure->next_data.scratch < CLOSURE_BARRIER_FIRST_REF_BIT) {
1306 if ((t->write_state == GRPC_CHTTP2_WRITE_STATE_IDLE) ||
1307 !(closure->next_data.scratch & CLOSURE_BARRIER_MAY_COVER_WRITE)) {
1308 // Using GRPC_CLOSURE_SCHED instead of GRPC_CLOSURE_RUN to avoid running
1309 // closures earlier than when it is safe to do so.
1310 GRPC_CLOSURE_SCHED(closure, closure->error_data.error);
1312 grpc_closure_list_append(&t->run_after_write, closure,
1313 closure->error_data.error);
1318 static bool contains_non_ok_status(grpc_metadata_batch* batch) {
1319 if (batch->idx.named.grpc_status != nullptr) {
1320 return !grpc_mdelem_static_value_eq(batch->idx.named.grpc_status->md,
1321 GRPC_MDELEM_GRPC_STATUS_0);
1326 static void maybe_become_writable_due_to_send_msg(grpc_chttp2_transport* t,
1327 grpc_chttp2_stream* s) {
1328 if (s->id != 0 && (!s->write_buffering ||
1329 s->flow_controlled_buffer.length > t->write_buffer_size)) {
1330 grpc_chttp2_mark_stream_writable(t, s);
1331 grpc_chttp2_initiate_write(t, GRPC_CHTTP2_INITIATE_WRITE_SEND_MESSAGE);
1335 static void add_fetched_slice_locked(grpc_chttp2_transport* t,
1336 grpc_chttp2_stream* s) {
1337 s->fetched_send_message_length +=
1338 static_cast<uint32_t> GRPC_SLICE_LENGTH(s->fetching_slice);
1339 grpc_slice_buffer_add(&s->flow_controlled_buffer, s->fetching_slice);
1340 maybe_become_writable_due_to_send_msg(t, s);
1343 static void continue_fetching_send_locked(grpc_chttp2_transport* t,
1344 grpc_chttp2_stream* s) {
1346 if (s->fetching_send_message == nullptr) {
1347 /* Stream was cancelled before message fetch completed */
1348 abort(); /* TODO(ctiller): what cleanup here? */
1349 return; /* early out */
1351 if (s->fetched_send_message_length == s->fetching_send_message->length()) {
1352 int64_t notify_offset = s->next_message_end_offset;
1353 if (notify_offset <= s->flow_controlled_bytes_written) {
1354 grpc_chttp2_complete_closure_step(
1355 t, s, &s->fetching_send_message_finished, GRPC_ERROR_NONE,
1356 "fetching_send_message_finished");
1358 grpc_chttp2_write_cb* cb = t->write_cb_pool;
1359 if (cb == nullptr) {
1360 cb = static_cast<grpc_chttp2_write_cb*>(gpr_malloc(sizeof(*cb)));
1362 t->write_cb_pool = cb->next;
1364 cb->call_at_byte = notify_offset;
1365 cb->closure = s->fetching_send_message_finished;
1366 s->fetching_send_message_finished = nullptr;
1367 grpc_chttp2_write_cb** list =
1368 s->fetching_send_message->flags() & GRPC_WRITE_THROUGH
1369 ? &s->on_write_finished_cbs
1370 : &s->on_flow_controlled_cbs;
1374 s->fetching_send_message.reset();
1375 return; /* early out */
1376 } else if (s->fetching_send_message->Next(UINT32_MAX,
1377 &s->complete_fetch_locked)) {
1378 grpc_error* error = s->fetching_send_message->Pull(&s->fetching_slice);
1379 if (error != GRPC_ERROR_NONE) {
1380 s->fetching_send_message.reset();
1381 grpc_chttp2_cancel_stream(t, s, error);
1383 add_fetched_slice_locked(t, s);
1389 static void complete_fetch_locked(void* gs, grpc_error* error) {
1390 grpc_chttp2_stream* s = static_cast<grpc_chttp2_stream*>(gs);
1391 grpc_chttp2_transport* t = s->t;
1392 if (error == GRPC_ERROR_NONE) {
1393 error = s->fetching_send_message->Pull(&s->fetching_slice);
1394 if (error == GRPC_ERROR_NONE) {
1395 add_fetched_slice_locked(t, s);
1396 continue_fetching_send_locked(t, s);
1399 if (error != GRPC_ERROR_NONE) {
1400 s->fetching_send_message.reset();
1401 grpc_chttp2_cancel_stream(t, s, error);
1405 static void log_metadata(const grpc_metadata_batch* md_batch, uint32_t id,
1406 bool is_client, bool is_initial) {
1407 for (grpc_linked_mdelem* md = md_batch->list.head; md != nullptr;
1409 char* key = grpc_slice_to_c_string(GRPC_MDKEY(md->md));
1410 char* value = grpc_slice_to_c_string(GRPC_MDVALUE(md->md));
1411 gpr_log(GPR_INFO, "HTTP:%d:%s:%s: %s: %s", id, is_initial ? "HDR" : "TRL",
1412 is_client ? "CLI" : "SVR", key, value);
1418 static void perform_stream_op_locked(void* stream_op,
1419 grpc_error* error_ignored) {
1420 GPR_TIMER_SCOPE("perform_stream_op_locked", 0);
1422 grpc_transport_stream_op_batch* op =
1423 static_cast<grpc_transport_stream_op_batch*>(stream_op);
1424 grpc_chttp2_stream* s =
1425 static_cast<grpc_chttp2_stream*>(op->handler_private.extra_arg);
1426 grpc_transport_stream_op_batch_payload* op_payload = op->payload;
1427 grpc_chttp2_transport* t = s->t;
1429 GRPC_STATS_INC_HTTP2_OP_BATCHES();
1431 s->context = op->payload->context;
1432 s->traced = op->is_traced;
1433 if (GRPC_TRACE_FLAG_ENABLED(grpc_http_trace)) {
1434 char* str = grpc_transport_stream_op_batch_string(op);
1435 gpr_log(GPR_INFO, "perform_stream_op_locked: %s; on_complete = %p", str,
1438 if (op->send_initial_metadata) {
1439 log_metadata(op_payload->send_initial_metadata.send_initial_metadata,
1440 s->id, t->is_client, true);
1442 if (op->send_trailing_metadata) {
1443 log_metadata(op_payload->send_trailing_metadata.send_trailing_metadata,
1444 s->id, t->is_client, false);
1448 grpc_closure* on_complete = op->on_complete;
1449 // on_complete will be null if and only if there are no send ops in the batch.
1450 if (on_complete != nullptr) {
1451 // This batch has send ops. Use final_data as a barrier until enqueue time;
1452 // the initial counter is dropped at the end of this function.
1453 on_complete->next_data.scratch = CLOSURE_BARRIER_FIRST_REF_BIT;
1454 on_complete->error_data.error = GRPC_ERROR_NONE;
1457 if (op->cancel_stream) {
1458 GRPC_STATS_INC_HTTP2_OP_CANCEL();
1459 grpc_chttp2_cancel_stream(t, s, op_payload->cancel_stream.cancel_error);
1462 if (op->send_initial_metadata) {
1463 if (t->is_client && t->channelz_socket != nullptr) {
1464 t->channelz_socket->RecordStreamStartedFromLocal();
1466 GRPC_STATS_INC_HTTP2_OP_SEND_INITIAL_METADATA();
1467 GPR_ASSERT(s->send_initial_metadata_finished == nullptr);
1468 on_complete->next_data.scratch |= CLOSURE_BARRIER_MAY_COVER_WRITE;
1470 /* Identify stream compression */
1471 if (op_payload->send_initial_metadata.send_initial_metadata->idx.named
1472 .content_encoding == nullptr ||
1473 grpc_stream_compression_method_parse(
1475 op_payload->send_initial_metadata.send_initial_metadata->idx
1476 .named.content_encoding->md),
1477 true, &s->stream_compression_method) == 0) {
1478 s->stream_compression_method = GRPC_STREAM_COMPRESSION_IDENTITY_COMPRESS;
1480 if (s->stream_compression_method !=
1481 GRPC_STREAM_COMPRESSION_IDENTITY_COMPRESS) {
1482 s->uncompressed_data_size = 0;
1483 s->stream_compression_ctx = nullptr;
1484 grpc_slice_buffer_init(&s->compressed_data_buffer);
1486 s->send_initial_metadata_finished = add_closure_barrier(on_complete);
1487 s->send_initial_metadata =
1488 op_payload->send_initial_metadata.send_initial_metadata;
1489 const size_t metadata_size =
1490 grpc_metadata_batch_size(s->send_initial_metadata);
1491 const size_t metadata_peer_limit =
1492 t->settings[GRPC_PEER_SETTINGS]
1493 [GRPC_CHTTP2_SETTINGS_MAX_HEADER_LIST_SIZE];
1495 s->deadline = GPR_MIN(s->deadline, s->send_initial_metadata->deadline);
1497 if (metadata_size > metadata_peer_limit) {
1498 grpc_chttp2_cancel_stream(
1502 grpc_error_set_int(GRPC_ERROR_CREATE_FROM_STATIC_STRING(
1503 "to-be-sent initial metadata size "
1504 "exceeds peer limit"),
1505 GRPC_ERROR_INT_SIZE,
1506 static_cast<intptr_t>(metadata_size)),
1507 GRPC_ERROR_INT_LIMIT,
1508 static_cast<intptr_t>(metadata_peer_limit)),
1509 GRPC_ERROR_INT_GRPC_STATUS, GRPC_STATUS_RESOURCE_EXHAUSTED));
1511 if (contains_non_ok_status(s->send_initial_metadata)) {
1512 s->seen_error = true;
1514 if (!s->write_closed) {
1516 if (t->closed_with_error == GRPC_ERROR_NONE) {
1517 GPR_ASSERT(s->id == 0);
1518 grpc_chttp2_list_add_waiting_for_concurrency(t, s);
1519 maybe_start_some_streams(t);
1521 grpc_chttp2_cancel_stream(
1524 GRPC_ERROR_CREATE_REFERENCING_FROM_STATIC_STRING(
1525 "Transport closed", &t->closed_with_error, 1),
1526 GRPC_ERROR_INT_GRPC_STATUS, GRPC_STATUS_UNAVAILABLE));
1529 GPR_ASSERT(s->id != 0);
1530 grpc_chttp2_mark_stream_writable(t, s);
1531 if (!(op->send_message &&
1532 (op->payload->send_message.send_message->flags() &
1533 GRPC_WRITE_BUFFER_HINT))) {
1534 grpc_chttp2_initiate_write(
1535 t, GRPC_CHTTP2_INITIATE_WRITE_SEND_INITIAL_METADATA);
1539 s->send_initial_metadata = nullptr;
1540 grpc_chttp2_complete_closure_step(
1541 t, s, &s->send_initial_metadata_finished,
1542 GRPC_ERROR_CREATE_REFERENCING_FROM_STATIC_STRING(
1543 "Attempt to send initial metadata after stream was closed",
1544 &s->write_closed_error, 1),
1545 "send_initial_metadata_finished");
1548 if (op_payload->send_initial_metadata.peer_string != nullptr) {
1549 gpr_atm_rel_store(op_payload->send_initial_metadata.peer_string,
1550 (gpr_atm)t->peer_string);
1554 if (op->send_message) {
1555 GRPC_STATS_INC_HTTP2_OP_SEND_MESSAGE();
1556 t->num_messages_in_next_write++;
1557 GRPC_STATS_INC_HTTP2_SEND_MESSAGE_SIZE(
1558 op->payload->send_message.send_message->length());
1559 on_complete->next_data.scratch |= CLOSURE_BARRIER_MAY_COVER_WRITE;
1560 s->fetching_send_message_finished = add_closure_barrier(op->on_complete);
1561 if (s->write_closed) {
1562 op->payload->send_message.stream_write_closed = true;
1563 // We should NOT return an error here, so as to avoid a cancel OP being
1564 // started. The surface layer will notice that the stream has been closed
1565 // for writes and fail the send message op.
1566 op->payload->send_message.send_message.reset();
1567 grpc_chttp2_complete_closure_step(
1568 t, s, &s->fetching_send_message_finished, GRPC_ERROR_NONE,
1569 "fetching_send_message_finished");
1571 GPR_ASSERT(s->fetching_send_message == nullptr);
1572 uint8_t* frame_hdr = grpc_slice_buffer_tiny_add(
1573 &s->flow_controlled_buffer, GRPC_HEADER_SIZE_IN_BYTES);
1574 uint32_t flags = op_payload->send_message.send_message->flags();
1575 frame_hdr[0] = (flags & GRPC_WRITE_INTERNAL_COMPRESS) != 0;
1576 size_t len = op_payload->send_message.send_message->length();
1577 frame_hdr[1] = static_cast<uint8_t>(len >> 24);
1578 frame_hdr[2] = static_cast<uint8_t>(len >> 16);
1579 frame_hdr[3] = static_cast<uint8_t>(len >> 8);
1580 frame_hdr[4] = static_cast<uint8_t>(len);
1581 s->fetching_send_message =
1582 std::move(op_payload->send_message.send_message);
1583 s->fetched_send_message_length = 0;
1584 s->next_message_end_offset =
1585 s->flow_controlled_bytes_written +
1586 static_cast<int64_t>(s->flow_controlled_buffer.length) +
1587 static_cast<int64_t>(len);
1588 if (flags & GRPC_WRITE_BUFFER_HINT) {
1589 s->next_message_end_offset -= t->write_buffer_size;
1590 s->write_buffering = true;
1592 s->write_buffering = false;
1594 continue_fetching_send_locked(t, s);
1595 maybe_become_writable_due_to_send_msg(t, s);
1599 if (op->send_trailing_metadata) {
1600 GRPC_STATS_INC_HTTP2_OP_SEND_TRAILING_METADATA();
1601 GPR_ASSERT(s->send_trailing_metadata_finished == nullptr);
1602 on_complete->next_data.scratch |= CLOSURE_BARRIER_MAY_COVER_WRITE;
1603 s->send_trailing_metadata_finished = add_closure_barrier(on_complete);
1604 s->send_trailing_metadata =
1605 op_payload->send_trailing_metadata.send_trailing_metadata;
1606 s->write_buffering = false;
1607 const size_t metadata_size =
1608 grpc_metadata_batch_size(s->send_trailing_metadata);
1609 const size_t metadata_peer_limit =
1610 t->settings[GRPC_PEER_SETTINGS]
1611 [GRPC_CHTTP2_SETTINGS_MAX_HEADER_LIST_SIZE];
1612 if (metadata_size > metadata_peer_limit) {
1613 grpc_chttp2_cancel_stream(
1617 grpc_error_set_int(GRPC_ERROR_CREATE_FROM_STATIC_STRING(
1618 "to-be-sent trailing metadata size "
1619 "exceeds peer limit"),
1620 GRPC_ERROR_INT_SIZE,
1621 static_cast<intptr_t>(metadata_size)),
1622 GRPC_ERROR_INT_LIMIT,
1623 static_cast<intptr_t>(metadata_peer_limit)),
1624 GRPC_ERROR_INT_GRPC_STATUS, GRPC_STATUS_RESOURCE_EXHAUSTED));
1626 if (contains_non_ok_status(s->send_trailing_metadata)) {
1627 s->seen_error = true;
1629 if (s->write_closed) {
1630 s->send_trailing_metadata = nullptr;
1631 grpc_chttp2_complete_closure_step(
1632 t, s, &s->send_trailing_metadata_finished,
1633 grpc_metadata_batch_is_empty(
1634 op->payload->send_trailing_metadata.send_trailing_metadata)
1636 : GRPC_ERROR_CREATE_FROM_STATIC_STRING(
1637 "Attempt to send trailing metadata after "
1638 "stream was closed"),
1639 "send_trailing_metadata_finished");
1640 } else if (s->id != 0) {
1641 /* TODO(ctiller): check if there's flow control for any outstanding
1642 bytes before going writable */
1643 grpc_chttp2_mark_stream_writable(t, s);
1644 grpc_chttp2_initiate_write(
1645 t, GRPC_CHTTP2_INITIATE_WRITE_SEND_TRAILING_METADATA);
1650 if (op->recv_initial_metadata) {
1651 GRPC_STATS_INC_HTTP2_OP_RECV_INITIAL_METADATA();
1652 GPR_ASSERT(s->recv_initial_metadata_ready == nullptr);
1653 s->recv_initial_metadata_ready =
1654 op_payload->recv_initial_metadata.recv_initial_metadata_ready;
1655 s->recv_initial_metadata =
1656 op_payload->recv_initial_metadata.recv_initial_metadata;
1657 s->trailing_metadata_available =
1658 op_payload->recv_initial_metadata.trailing_metadata_available;
1659 if (op_payload->recv_initial_metadata.peer_string != nullptr) {
1660 gpr_atm_rel_store(op_payload->recv_initial_metadata.peer_string,
1661 (gpr_atm)t->peer_string);
1663 grpc_chttp2_maybe_complete_recv_initial_metadata(t, s);
1666 if (op->recv_message) {
1667 GRPC_STATS_INC_HTTP2_OP_RECV_MESSAGE();
1669 GPR_ASSERT(s->recv_message_ready == nullptr);
1670 GPR_ASSERT(!s->pending_byte_stream);
1671 s->recv_message_ready = op_payload->recv_message.recv_message_ready;
1672 s->recv_message = op_payload->recv_message.recv_message;
1674 if (!s->read_closed) {
1675 before = s->frame_storage.length +
1676 s->unprocessed_incoming_frames_buffer.length;
1679 grpc_chttp2_maybe_complete_recv_message(t, s);
1681 if (!s->read_closed && s->frame_storage.length == 0) {
1682 size_t after = s->frame_storage.length +
1683 s->unprocessed_incoming_frames_buffer_cached_length;
1684 s->flow_control->IncomingByteStreamUpdate(GRPC_HEADER_SIZE_IN_BYTES,
1686 grpc_chttp2_act_on_flowctl_action(s->flow_control->MakeAction(), t, s);
1691 if (op->recv_trailing_metadata) {
1692 GRPC_STATS_INC_HTTP2_OP_RECV_TRAILING_METADATA();
1693 GPR_ASSERT(s->collecting_stats == nullptr);
1694 s->collecting_stats = op_payload->recv_trailing_metadata.collect_stats;
1695 GPR_ASSERT(s->recv_trailing_metadata_finished == nullptr);
1696 s->recv_trailing_metadata_finished =
1697 op_payload->recv_trailing_metadata.recv_trailing_metadata_ready;
1698 s->recv_trailing_metadata =
1699 op_payload->recv_trailing_metadata.recv_trailing_metadata;
1700 s->final_metadata_requested = true;
1701 grpc_chttp2_maybe_complete_recv_trailing_metadata(t, s);
1704 if (on_complete != nullptr) {
1705 grpc_chttp2_complete_closure_step(t, s, &on_complete, GRPC_ERROR_NONE,
1709 GRPC_CHTTP2_STREAM_UNREF(s, "perform_stream_op");
1712 static void perform_stream_op(grpc_transport* gt, grpc_stream* gs,
1713 grpc_transport_stream_op_batch* op) {
1714 GPR_TIMER_SCOPE("perform_stream_op", 0);
1715 grpc_chttp2_transport* t = reinterpret_cast<grpc_chttp2_transport*>(gt);
1716 grpc_chttp2_stream* s = reinterpret_cast<grpc_chttp2_stream*>(gs);
1718 if (!t->is_client) {
1719 if (op->send_initial_metadata) {
1720 grpc_millis deadline =
1721 op->payload->send_initial_metadata.send_initial_metadata->deadline;
1722 GPR_ASSERT(deadline == GRPC_MILLIS_INF_FUTURE);
1724 if (op->send_trailing_metadata) {
1725 grpc_millis deadline =
1726 op->payload->send_trailing_metadata.send_trailing_metadata->deadline;
1727 GPR_ASSERT(deadline == GRPC_MILLIS_INF_FUTURE);
1731 if (GRPC_TRACE_FLAG_ENABLED(grpc_http_trace)) {
1732 char* str = grpc_transport_stream_op_batch_string(op);
1733 gpr_log(GPR_INFO, "perform_stream_op[s=%p]: %s", s, str);
1737 GRPC_CHTTP2_STREAM_REF(s, "perform_stream_op");
1738 op->handler_private.extra_arg = gs;
1740 GRPC_CLOSURE_INIT(&op->handler_private.closure, perform_stream_op_locked,
1741 op, grpc_combiner_scheduler(t->combiner)),
1745 static void cancel_pings(grpc_chttp2_transport* t, grpc_error* error) {
1746 /* callback remaining pings: they're not allowed to call into the transport,
1747 and maybe they hold resources that need to be freed */
1748 grpc_chttp2_ping_queue* pq = &t->ping_queue;
1749 GPR_ASSERT(error != GRPC_ERROR_NONE);
1750 for (size_t j = 0; j < GRPC_CHTTP2_PCL_COUNT; j++) {
1751 grpc_closure_list_fail_all(&pq->lists[j], GRPC_ERROR_REF(error));
1752 GRPC_CLOSURE_LIST_SCHED(&pq->lists[j]);
1754 GRPC_ERROR_UNREF(error);
1757 static void send_ping_locked(grpc_chttp2_transport* t,
1758 grpc_closure* on_initiate, grpc_closure* on_ack) {
1759 if (t->closed_with_error != GRPC_ERROR_NONE) {
1760 GRPC_CLOSURE_SCHED(on_initiate, GRPC_ERROR_REF(t->closed_with_error));
1761 GRPC_CLOSURE_SCHED(on_ack, GRPC_ERROR_REF(t->closed_with_error));
1764 grpc_chttp2_ping_queue* pq = &t->ping_queue;
1765 grpc_closure_list_append(&pq->lists[GRPC_CHTTP2_PCL_INITIATE], on_initiate,
1767 grpc_closure_list_append(&pq->lists[GRPC_CHTTP2_PCL_NEXT], on_ack,
1772 * Specialized form of send_ping_locked for keepalive ping. If there is already
1773 * a ping in progress, the keepalive ping would piggyback onto that ping,
1774 * instead of waiting for that ping to complete and then starting a new ping.
1776 static void send_keepalive_ping_locked(grpc_chttp2_transport* t) {
1777 if (t->closed_with_error != GRPC_ERROR_NONE) {
1778 GRPC_CLOSURE_RUN(&t->start_keepalive_ping_locked,
1779 GRPC_ERROR_REF(t->closed_with_error));
1780 GRPC_CLOSURE_RUN(&t->finish_keepalive_ping_locked,
1781 GRPC_ERROR_REF(t->closed_with_error));
1784 grpc_chttp2_ping_queue* pq = &t->ping_queue;
1785 if (!grpc_closure_list_empty(pq->lists[GRPC_CHTTP2_PCL_INFLIGHT])) {
1786 /* There is a ping in flight. Add yourself to the inflight closure list. */
1787 GRPC_CLOSURE_RUN(&t->start_keepalive_ping_locked, GRPC_ERROR_NONE);
1788 grpc_closure_list_append(&pq->lists[GRPC_CHTTP2_PCL_INFLIGHT],
1789 &t->finish_keepalive_ping_locked, GRPC_ERROR_NONE);
1792 grpc_closure_list_append(&pq->lists[GRPC_CHTTP2_PCL_INITIATE],
1793 &t->start_keepalive_ping_locked, GRPC_ERROR_NONE);
1794 grpc_closure_list_append(&pq->lists[GRPC_CHTTP2_PCL_NEXT],
1795 &t->finish_keepalive_ping_locked, GRPC_ERROR_NONE);
1798 static void retry_initiate_ping_locked(void* tp, grpc_error* error) {
1799 grpc_chttp2_transport* t = static_cast<grpc_chttp2_transport*>(tp);
1800 t->ping_state.is_delayed_ping_timer_set = false;
1801 if (error == GRPC_ERROR_NONE) {
1802 grpc_chttp2_initiate_write(t, GRPC_CHTTP2_INITIATE_WRITE_RETRY_SEND_PING);
1804 GRPC_CHTTP2_UNREF_TRANSPORT(t, "retry_initiate_ping_locked");
1807 void grpc_chttp2_ack_ping(grpc_chttp2_transport* t, uint64_t id) {
1808 grpc_chttp2_ping_queue* pq = &t->ping_queue;
1809 if (pq->inflight_id != id) {
1810 char* from = grpc_endpoint_get_peer(t->ep);
1811 gpr_log(GPR_DEBUG, "Unknown ping response from %s: %" PRIx64, from, id);
1815 GRPC_CLOSURE_LIST_SCHED(&pq->lists[GRPC_CHTTP2_PCL_INFLIGHT]);
1816 if (!grpc_closure_list_empty(pq->lists[GRPC_CHTTP2_PCL_NEXT])) {
1817 grpc_chttp2_initiate_write(t, GRPC_CHTTP2_INITIATE_WRITE_CONTINUE_PINGS);
1821 static void send_goaway(grpc_chttp2_transport* t, grpc_error* error) {
1822 /* We want to log this irrespective of whether http tracing is enabled */
1823 gpr_log(GPR_INFO, "%s: Sending goaway err=%s", t->peer_string,
1824 grpc_error_string(error));
1825 t->sent_goaway_state = GRPC_CHTTP2_GOAWAY_SEND_SCHEDULED;
1826 grpc_http2_error_code http_error;
1828 grpc_error_get_status(error, GRPC_MILLIS_INF_FUTURE, nullptr, &slice,
1829 &http_error, nullptr);
1830 grpc_chttp2_goaway_append(t->last_new_stream_id,
1831 static_cast<uint32_t>(http_error),
1832 grpc_slice_ref_internal(slice), &t->qbuf);
1833 grpc_chttp2_initiate_write(t, GRPC_CHTTP2_INITIATE_WRITE_GOAWAY_SENT);
1834 GRPC_ERROR_UNREF(error);
1837 void grpc_chttp2_add_ping_strike(grpc_chttp2_transport* t) {
1838 if (++t->ping_recv_state.ping_strikes > t->ping_policy.max_ping_strikes &&
1839 t->ping_policy.max_ping_strikes != 0) {
1842 GRPC_ERROR_CREATE_FROM_STATIC_STRING("too_many_pings"),
1843 GRPC_ERROR_INT_HTTP2_ERROR, GRPC_HTTP2_ENHANCE_YOUR_CALM));
1844 /*The transport will be closed after the write is done */
1845 close_transport_locked(
1846 t, grpc_error_set_int(
1847 GRPC_ERROR_CREATE_FROM_STATIC_STRING("Too many pings"),
1848 GRPC_ERROR_INT_GRPC_STATUS, GRPC_STATUS_UNAVAILABLE));
1852 static void perform_transport_op_locked(void* stream_op,
1853 grpc_error* error_ignored) {
1854 grpc_transport_op* op = static_cast<grpc_transport_op*>(stream_op);
1855 grpc_chttp2_transport* t =
1856 static_cast<grpc_chttp2_transport*>(op->handler_private.extra_arg);
1858 if (op->goaway_error) {
1859 send_goaway(t, op->goaway_error);
1862 if (op->set_accept_stream) {
1863 t->channel_callback.accept_stream = op->set_accept_stream_fn;
1864 t->channel_callback.accept_stream_user_data =
1865 op->set_accept_stream_user_data;
1868 if (op->bind_pollset) {
1869 grpc_endpoint_add_to_pollset(t->ep, op->bind_pollset);
1872 if (op->bind_pollset_set) {
1873 grpc_endpoint_add_to_pollset_set(t->ep, op->bind_pollset_set);
1876 if (op->send_ping.on_initiate != nullptr || op->send_ping.on_ack != nullptr) {
1877 send_ping_locked(t, op->send_ping.on_initiate, op->send_ping.on_ack);
1878 grpc_chttp2_initiate_write(t, GRPC_CHTTP2_INITIATE_WRITE_APPLICATION_PING);
1881 if (op->on_connectivity_state_change != nullptr) {
1882 grpc_connectivity_state_notify_on_state_change(
1883 &t->channel_callback.state_tracker, op->connectivity_state,
1884 op->on_connectivity_state_change);
1887 if (op->disconnect_with_error != GRPC_ERROR_NONE) {
1888 close_transport_locked(t, op->disconnect_with_error);
1891 GRPC_CLOSURE_RUN(op->on_consumed, GRPC_ERROR_NONE);
1893 GRPC_CHTTP2_UNREF_TRANSPORT(t, "transport_op");
1896 static void perform_transport_op(grpc_transport* gt, grpc_transport_op* op) {
1897 grpc_chttp2_transport* t = reinterpret_cast<grpc_chttp2_transport*>(gt);
1898 if (GRPC_TRACE_FLAG_ENABLED(grpc_http_trace)) {
1899 char* msg = grpc_transport_op_string(op);
1900 gpr_log(GPR_INFO, "perform_transport_op[t=%p]: %s", t, msg);
1903 op->handler_private.extra_arg = gt;
1904 GRPC_CHTTP2_REF_TRANSPORT(t, "transport_op");
1905 GRPC_CLOSURE_SCHED(GRPC_CLOSURE_INIT(&op->handler_private.closure,
1906 perform_transport_op_locked, op,
1907 grpc_combiner_scheduler(t->combiner)),
1911 /*******************************************************************************
1912 * INPUT PROCESSING - GENERAL
1915 void grpc_chttp2_maybe_complete_recv_initial_metadata(grpc_chttp2_transport* t,
1916 grpc_chttp2_stream* s) {
1917 if (s->recv_initial_metadata_ready != nullptr &&
1918 s->published_metadata[0] != GRPC_METADATA_NOT_PUBLISHED) {
1919 if (s->seen_error) {
1920 grpc_slice_buffer_reset_and_unref_internal(&s->frame_storage);
1921 if (!s->pending_byte_stream) {
1922 grpc_slice_buffer_reset_and_unref_internal(
1923 &s->unprocessed_incoming_frames_buffer);
1926 grpc_chttp2_incoming_metadata_buffer_publish(&s->metadata_buffer[0],
1927 s->recv_initial_metadata);
1928 null_then_sched_closure(&s->recv_initial_metadata_ready);
1932 void grpc_chttp2_maybe_complete_recv_message(grpc_chttp2_transport* t,
1933 grpc_chttp2_stream* s) {
1934 grpc_error* error = GRPC_ERROR_NONE;
1935 if (s->recv_message_ready != nullptr) {
1936 *s->recv_message = nullptr;
1937 if (s->final_metadata_requested && s->seen_error) {
1938 grpc_slice_buffer_reset_and_unref_internal(&s->frame_storage);
1939 if (!s->pending_byte_stream) {
1940 grpc_slice_buffer_reset_and_unref_internal(
1941 &s->unprocessed_incoming_frames_buffer);
1944 if (!s->pending_byte_stream) {
1945 while (s->unprocessed_incoming_frames_buffer.length > 0 ||
1946 s->frame_storage.length > 0) {
1947 if (s->unprocessed_incoming_frames_buffer.length == 0) {
1948 grpc_slice_buffer_swap(&s->unprocessed_incoming_frames_buffer,
1950 s->unprocessed_incoming_frames_decompressed = false;
1952 if (!s->unprocessed_incoming_frames_decompressed &&
1953 s->stream_decompression_method !=
1954 GRPC_STREAM_COMPRESSION_IDENTITY_DECOMPRESS) {
1955 GPR_ASSERT(s->decompressed_data_buffer.length == 0);
1956 bool end_of_context;
1957 if (!s->stream_decompression_ctx) {
1958 s->stream_decompression_ctx =
1959 grpc_stream_compression_context_create(
1960 s->stream_decompression_method);
1962 if (!grpc_stream_decompress(
1963 s->stream_decompression_ctx,
1964 &s->unprocessed_incoming_frames_buffer,
1965 &s->decompressed_data_buffer, nullptr,
1966 GRPC_HEADER_SIZE_IN_BYTES - s->decompressed_header_bytes,
1968 grpc_slice_buffer_reset_and_unref_internal(&s->frame_storage);
1969 grpc_slice_buffer_reset_and_unref_internal(
1970 &s->unprocessed_incoming_frames_buffer);
1971 error = GRPC_ERROR_CREATE_FROM_STATIC_STRING(
1972 "Stream decompression error.");
1974 s->decompressed_header_bytes += s->decompressed_data_buffer.length;
1975 if (s->decompressed_header_bytes == GRPC_HEADER_SIZE_IN_BYTES) {
1976 s->decompressed_header_bytes = 0;
1978 error = grpc_deframe_unprocessed_incoming_frames(
1979 &s->data_parser, s, &s->decompressed_data_buffer, nullptr,
1981 if (end_of_context) {
1982 grpc_stream_compression_context_destroy(
1983 s->stream_decompression_ctx);
1984 s->stream_decompression_ctx = nullptr;
1988 error = grpc_deframe_unprocessed_incoming_frames(
1989 &s->data_parser, s, &s->unprocessed_incoming_frames_buffer,
1990 nullptr, s->recv_message);
1992 if (error != GRPC_ERROR_NONE) {
1993 s->seen_error = true;
1994 grpc_slice_buffer_reset_and_unref_internal(&s->frame_storage);
1995 grpc_slice_buffer_reset_and_unref_internal(
1996 &s->unprocessed_incoming_frames_buffer);
1998 } else if (*s->recv_message != nullptr) {
2003 // save the length of the buffer before handing control back to application
2004 // threads. Needed to support correct flow control bookkeeping
2005 s->unprocessed_incoming_frames_buffer_cached_length =
2006 s->unprocessed_incoming_frames_buffer.length;
2007 if (error == GRPC_ERROR_NONE && *s->recv_message != nullptr) {
2008 null_then_sched_closure(&s->recv_message_ready);
2009 } else if (s->published_metadata[1] != GRPC_METADATA_NOT_PUBLISHED) {
2010 *s->recv_message = nullptr;
2011 null_then_sched_closure(&s->recv_message_ready);
2013 GRPC_ERROR_UNREF(error);
2017 void grpc_chttp2_maybe_complete_recv_trailing_metadata(grpc_chttp2_transport* t,
2018 grpc_chttp2_stream* s) {
2019 grpc_chttp2_maybe_complete_recv_message(t, s);
2020 if (s->recv_trailing_metadata_finished != nullptr && s->read_closed &&
2022 if (s->seen_error || !t->is_client) {
2023 grpc_slice_buffer_reset_and_unref_internal(&s->frame_storage);
2024 if (!s->pending_byte_stream) {
2025 grpc_slice_buffer_reset_and_unref_internal(
2026 &s->unprocessed_incoming_frames_buffer);
2029 bool pending_data = s->pending_byte_stream ||
2030 s->unprocessed_incoming_frames_buffer.length > 0;
2031 if (s->read_closed && s->frame_storage.length > 0 && !pending_data &&
2032 !s->seen_error && s->recv_trailing_metadata_finished != nullptr) {
2033 /* Maybe some SYNC_FLUSH data is left in frame_storage. Consume them and
2034 * maybe decompress the next 5 bytes in the stream. */
2035 if (s->stream_decompression_method ==
2036 GRPC_STREAM_COMPRESSION_IDENTITY_DECOMPRESS) {
2037 grpc_slice_buffer_move_first(
2039 GPR_MIN(s->frame_storage.length, GRPC_HEADER_SIZE_IN_BYTES),
2040 &s->unprocessed_incoming_frames_buffer);
2041 if (s->unprocessed_incoming_frames_buffer.length > 0) {
2042 s->unprocessed_incoming_frames_decompressed = true;
2043 pending_data = true;
2046 bool end_of_context;
2047 if (!s->stream_decompression_ctx) {
2048 s->stream_decompression_ctx = grpc_stream_compression_context_create(
2049 s->stream_decompression_method);
2051 if (!grpc_stream_decompress(
2052 s->stream_decompression_ctx, &s->frame_storage,
2053 &s->unprocessed_incoming_frames_buffer, nullptr,
2054 GRPC_HEADER_SIZE_IN_BYTES, &end_of_context)) {
2055 grpc_slice_buffer_reset_and_unref_internal(&s->frame_storage);
2056 grpc_slice_buffer_reset_and_unref_internal(
2057 &s->unprocessed_incoming_frames_buffer);
2058 s->seen_error = true;
2060 if (s->unprocessed_incoming_frames_buffer.length > 0) {
2061 s->unprocessed_incoming_frames_decompressed = true;
2062 pending_data = true;
2064 if (end_of_context) {
2065 grpc_stream_compression_context_destroy(
2066 s->stream_decompression_ctx);
2067 s->stream_decompression_ctx = nullptr;
2072 if (s->read_closed && s->frame_storage.length == 0 && !pending_data &&
2073 s->recv_trailing_metadata_finished != nullptr) {
2074 grpc_transport_move_stats(&s->stats, s->collecting_stats);
2075 s->collecting_stats = nullptr;
2076 grpc_chttp2_incoming_metadata_buffer_publish(&s->metadata_buffer[1],
2077 s->recv_trailing_metadata);
2078 null_then_sched_closure(&s->recv_trailing_metadata_finished);
2083 static void remove_stream(grpc_chttp2_transport* t, uint32_t id,
2084 grpc_error* error) {
2085 grpc_chttp2_stream* s = static_cast<grpc_chttp2_stream*>(
2086 grpc_chttp2_stream_map_delete(&t->stream_map, id));
2087 GPR_DEBUG_ASSERT(s);
2088 if (t->incoming_stream == s) {
2089 t->incoming_stream = nullptr;
2090 grpc_chttp2_parsing_become_skip_parser(t);
2092 if (s->pending_byte_stream) {
2093 if (s->on_next != nullptr) {
2094 grpc_core::Chttp2IncomingByteStream* bs = s->data_parser.parsing_frame;
2095 if (error == GRPC_ERROR_NONE) {
2096 error = GRPC_ERROR_CREATE_FROM_STATIC_STRING("Truncated message");
2098 bs->PublishError(error);
2100 s->data_parser.parsing_frame = nullptr;
2102 GRPC_ERROR_UNREF(s->byte_stream_error);
2103 s->byte_stream_error = GRPC_ERROR_REF(error);
2107 if (grpc_chttp2_stream_map_size(&t->stream_map) == 0) {
2108 post_benign_reclaimer(t);
2109 if (t->sent_goaway_state == GRPC_CHTTP2_GOAWAY_SENT) {
2110 close_transport_locked(
2111 t, GRPC_ERROR_CREATE_REFERENCING_FROM_STATIC_STRING(
2112 "Last stream closed after sending GOAWAY", &error, 1));
2115 if (grpc_chttp2_list_remove_writable_stream(t, s)) {
2116 GRPC_CHTTP2_STREAM_UNREF(s, "chttp2_writing:remove_stream");
2119 GRPC_ERROR_UNREF(error);
2121 maybe_start_some_streams(t);
2124 void grpc_chttp2_cancel_stream(grpc_chttp2_transport* t, grpc_chttp2_stream* s,
2125 grpc_error* due_to_error) {
2126 if (!t->is_client && !s->sent_trailing_metadata &&
2127 grpc_error_has_clear_grpc_status(due_to_error)) {
2128 close_from_api(t, s, due_to_error);
2132 if (!s->read_closed || !s->write_closed) {
2134 grpc_http2_error_code http_error;
2135 grpc_error_get_status(due_to_error, s->deadline, nullptr, nullptr,
2136 &http_error, nullptr);
2137 grpc_chttp2_add_rst_stream_to_next_write(
2138 t, s->id, static_cast<uint32_t>(http_error), &s->stats.outgoing);
2139 grpc_chttp2_initiate_write(t, GRPC_CHTTP2_INITIATE_WRITE_RST_STREAM);
2142 if (due_to_error != GRPC_ERROR_NONE && !s->seen_error) {
2143 s->seen_error = true;
2145 grpc_chttp2_mark_stream_closed(t, s, 1, 1, due_to_error);
2148 void grpc_chttp2_fake_status(grpc_chttp2_transport* t, grpc_chttp2_stream* s,
2149 grpc_error* error) {
2150 grpc_status_code status;
2152 grpc_error_get_status(error, s->deadline, &status, &slice, nullptr, nullptr);
2153 if (status != GRPC_STATUS_OK) {
2154 s->seen_error = true;
2156 /* stream_global->recv_trailing_metadata_finished gives us a
2157 last chance replacement: we've received trailing metadata,
2158 but something more important has become available to signal
2159 to the upper layers - drop what we've got, and then publish
2160 what we want - which is safe because we haven't told anyone
2161 about the metadata yet */
2162 if (s->published_metadata[1] == GRPC_METADATA_NOT_PUBLISHED ||
2163 s->recv_trailing_metadata_finished != nullptr) {
2164 char status_string[GPR_LTOA_MIN_BUFSIZE];
2165 gpr_ltoa(status, status_string);
2166 GRPC_LOG_IF_ERROR("add_status",
2167 grpc_chttp2_incoming_metadata_buffer_replace_or_add(
2168 &s->metadata_buffer[1],
2169 grpc_mdelem_from_slices(
2170 GRPC_MDSTR_GRPC_STATUS,
2171 grpc_core::UnmanagedMemorySlice(status_string))));
2172 if (!GRPC_SLICE_IS_EMPTY(slice)) {
2174 "add_status_message",
2175 grpc_chttp2_incoming_metadata_buffer_replace_or_add(
2176 &s->metadata_buffer[1],
2177 grpc_mdelem_create(GRPC_MDSTR_GRPC_MESSAGE, slice, nullptr)));
2179 s->published_metadata[1] = GRPC_METADATA_SYNTHESIZED_FROM_FAKE;
2180 grpc_chttp2_maybe_complete_recv_trailing_metadata(t, s);
2183 GRPC_ERROR_UNREF(error);
2186 static void add_error(grpc_error* error, grpc_error** refs, size_t* nrefs) {
2187 if (error == GRPC_ERROR_NONE) return;
2188 for (size_t i = 0; i < *nrefs; i++) {
2189 if (error == refs[i]) {
2193 refs[*nrefs] = error;
2197 static grpc_error* removal_error(grpc_error* extra_error, grpc_chttp2_stream* s,
2198 const char* master_error_msg) {
2199 grpc_error* refs[3];
2201 add_error(s->read_closed_error, refs, &nrefs);
2202 add_error(s->write_closed_error, refs, &nrefs);
2203 add_error(extra_error, refs, &nrefs);
2204 grpc_error* error = GRPC_ERROR_NONE;
2206 error = GRPC_ERROR_CREATE_REFERENCING_FROM_STATIC_STRING(master_error_msg,
2209 GRPC_ERROR_UNREF(extra_error);
2213 static void flush_write_list(grpc_chttp2_transport* t, grpc_chttp2_stream* s,
2214 grpc_chttp2_write_cb** list, grpc_error* error) {
2216 grpc_chttp2_write_cb* cb = *list;
2218 grpc_chttp2_complete_closure_step(t, s, &cb->closure, GRPC_ERROR_REF(error),
2219 "on_write_finished_cb");
2220 cb->next = t->write_cb_pool;
2221 t->write_cb_pool = cb;
2223 GRPC_ERROR_UNREF(error);
2226 void grpc_chttp2_fail_pending_writes(grpc_chttp2_transport* t,
2227 grpc_chttp2_stream* s, grpc_error* error) {
2229 removal_error(error, s, "Pending writes failed due to stream closure");
2230 s->send_initial_metadata = nullptr;
2231 grpc_chttp2_complete_closure_step(t, s, &s->send_initial_metadata_finished,
2232 GRPC_ERROR_REF(error),
2233 "send_initial_metadata_finished");
2235 s->send_trailing_metadata = nullptr;
2236 grpc_chttp2_complete_closure_step(t, s, &s->send_trailing_metadata_finished,
2237 GRPC_ERROR_REF(error),
2238 "send_trailing_metadata_finished");
2240 s->fetching_send_message.reset();
2241 grpc_chttp2_complete_closure_step(t, s, &s->fetching_send_message_finished,
2242 GRPC_ERROR_REF(error),
2243 "fetching_send_message_finished");
2244 flush_write_list(t, s, &s->on_write_finished_cbs, GRPC_ERROR_REF(error));
2245 flush_write_list(t, s, &s->on_flow_controlled_cbs, error);
2248 void grpc_chttp2_mark_stream_closed(grpc_chttp2_transport* t,
2249 grpc_chttp2_stream* s, int close_reads,
2250 int close_writes, grpc_error* error) {
2251 if (s->read_closed && s->write_closed) {
2252 /* already closed */
2253 grpc_chttp2_maybe_complete_recv_trailing_metadata(t, s);
2254 GRPC_ERROR_UNREF(error);
2257 bool closed_read = false;
2258 bool became_closed = false;
2259 if (close_reads && !s->read_closed) {
2260 s->read_closed_error = GRPC_ERROR_REF(error);
2261 s->read_closed = true;
2264 if (close_writes && !s->write_closed) {
2265 s->write_closed_error = GRPC_ERROR_REF(error);
2266 s->write_closed = true;
2267 grpc_chttp2_fail_pending_writes(t, s, GRPC_ERROR_REF(error));
2269 if (s->read_closed && s->write_closed) {
2270 became_closed = true;
2271 grpc_error* overall_error =
2272 removal_error(GRPC_ERROR_REF(error), s, "Stream removed");
2274 remove_stream(t, s->id, GRPC_ERROR_REF(overall_error));
2276 /* Purge streams waiting on concurrency still waiting for id assignment */
2277 grpc_chttp2_list_remove_waiting_for_concurrency(t, s);
2279 if (overall_error != GRPC_ERROR_NONE) {
2280 grpc_chttp2_fake_status(t, s, overall_error);
2284 for (int i = 0; i < 2; i++) {
2285 if (s->published_metadata[i] == GRPC_METADATA_NOT_PUBLISHED) {
2286 s->published_metadata[i] = GRPC_METADATA_PUBLISHED_AT_CLOSE;
2289 grpc_chttp2_maybe_complete_recv_initial_metadata(t, s);
2290 grpc_chttp2_maybe_complete_recv_message(t, s);
2292 if (became_closed) {
2293 grpc_chttp2_maybe_complete_recv_trailing_metadata(t, s);
2294 GRPC_CHTTP2_STREAM_UNREF(s, "chttp2");
2296 GRPC_ERROR_UNREF(error);
2299 static void close_from_api(grpc_chttp2_transport* t, grpc_chttp2_stream* s,
2300 grpc_error* error) {
2302 grpc_slice status_hdr;
2303 grpc_slice http_status_hdr;
2304 grpc_slice content_type_hdr;
2305 grpc_slice message_pfx;
2308 grpc_status_code grpc_status;
2310 grpc_error_get_status(error, s->deadline, &grpc_status, &slice, nullptr,
2313 GPR_ASSERT(grpc_status >= 0 && (int)grpc_status < 100);
2315 /* Hand roll a header block.
2316 This is unnecessarily ugly - at some point we should find a more
2318 It's complicated by the fact that our send machinery would be dead by
2319 the time we got around to sending this, so instead we ignore HPACK
2320 compression and just write the uncompressed bytes onto the wire. */
2321 if (!s->sent_initial_metadata) {
2322 http_status_hdr = GRPC_SLICE_MALLOC(13);
2323 p = GRPC_SLICE_START_PTR(http_status_hdr);
2337 GPR_ASSERT(p == GRPC_SLICE_END_PTR(http_status_hdr));
2338 len += static_cast<uint32_t> GRPC_SLICE_LENGTH(http_status_hdr);
2340 content_type_hdr = GRPC_SLICE_MALLOC(31);
2341 p = GRPC_SLICE_START_PTR(content_type_hdr);
2373 GPR_ASSERT(p == GRPC_SLICE_END_PTR(content_type_hdr));
2374 len += static_cast<uint32_t> GRPC_SLICE_LENGTH(content_type_hdr);
2377 status_hdr = GRPC_SLICE_MALLOC(15 + (grpc_status >= 10));
2378 p = GRPC_SLICE_START_PTR(status_hdr);
2379 *p++ = 0x00; /* literal header, not indexed */
2380 *p++ = 11; /* len(grpc-status) */
2392 if (grpc_status < 10) {
2394 *p++ = static_cast<uint8_t>('0' + grpc_status);
2397 *p++ = static_cast<uint8_t>('0' + (grpc_status / 10));
2398 *p++ = static_cast<uint8_t>('0' + (grpc_status % 10));
2400 GPR_ASSERT(p == GRPC_SLICE_END_PTR(status_hdr));
2401 len += static_cast<uint32_t> GRPC_SLICE_LENGTH(status_hdr);
2403 size_t msg_len = GRPC_SLICE_LENGTH(slice);
2404 GPR_ASSERT(msg_len <= UINT32_MAX);
2405 uint32_t msg_len_len = GRPC_CHTTP2_VARINT_LENGTH((uint32_t)msg_len, 1);
2406 message_pfx = GRPC_SLICE_MALLOC(14 + msg_len_len);
2407 p = GRPC_SLICE_START_PTR(message_pfx);
2408 *p++ = 0x00; /* literal header, not indexed */
2409 *p++ = 12; /* len(grpc-message) */
2422 GRPC_CHTTP2_WRITE_VARINT((uint32_t)msg_len, 1, 0, p, (uint32_t)msg_len_len);
2424 GPR_ASSERT(p == GRPC_SLICE_END_PTR(message_pfx));
2425 len += static_cast<uint32_t> GRPC_SLICE_LENGTH(message_pfx);
2426 len += static_cast<uint32_t>(msg_len);
2428 hdr = GRPC_SLICE_MALLOC(9);
2429 p = GRPC_SLICE_START_PTR(hdr);
2430 *p++ = static_cast<uint8_t>(len >> 16);
2431 *p++ = static_cast<uint8_t>(len >> 8);
2432 *p++ = static_cast<uint8_t>(len);
2433 *p++ = GRPC_CHTTP2_FRAME_HEADER;
2434 *p++ = GRPC_CHTTP2_DATA_FLAG_END_STREAM | GRPC_CHTTP2_DATA_FLAG_END_HEADERS;
2435 *p++ = static_cast<uint8_t>(s->id >> 24);
2436 *p++ = static_cast<uint8_t>(s->id >> 16);
2437 *p++ = static_cast<uint8_t>(s->id >> 8);
2438 *p++ = static_cast<uint8_t>(s->id);
2439 GPR_ASSERT(p == GRPC_SLICE_END_PTR(hdr));
2441 grpc_slice_buffer_add(&t->qbuf, hdr);
2442 if (!s->sent_initial_metadata) {
2443 grpc_slice_buffer_add(&t->qbuf, http_status_hdr);
2444 grpc_slice_buffer_add(&t->qbuf, content_type_hdr);
2446 grpc_slice_buffer_add(&t->qbuf, status_hdr);
2447 grpc_slice_buffer_add(&t->qbuf, message_pfx);
2448 grpc_slice_buffer_add(&t->qbuf, grpc_slice_ref_internal(slice));
2449 grpc_chttp2_add_rst_stream_to_next_write(t, s->id, GRPC_HTTP2_NO_ERROR,
2450 &s->stats.outgoing);
2452 grpc_chttp2_mark_stream_closed(t, s, 1, 1, error);
2453 grpc_chttp2_initiate_write(t, GRPC_CHTTP2_INITIATE_WRITE_CLOSE_FROM_API);
2458 grpc_chttp2_transport* t;
2459 } cancel_stream_cb_args;
2461 static void cancel_stream_cb(void* user_data, uint32_t key, void* stream) {
2462 cancel_stream_cb_args* args = static_cast<cancel_stream_cb_args*>(user_data);
2463 grpc_chttp2_stream* s = static_cast<grpc_chttp2_stream*>(stream);
2464 grpc_chttp2_cancel_stream(args->t, s, GRPC_ERROR_REF(args->error));
2467 static void end_all_the_calls(grpc_chttp2_transport* t, grpc_error* error) {
2468 intptr_t http2_error;
2469 // If there is no explicit grpc or HTTP/2 error, set to UNAVAILABLE on server.
2470 if (!t->is_client && !grpc_error_has_clear_grpc_status(error) &&
2471 !grpc_error_get_int(error, GRPC_ERROR_INT_HTTP2_ERROR, &http2_error)) {
2472 error = grpc_error_set_int(error, GRPC_ERROR_INT_GRPC_STATUS,
2473 GRPC_STATUS_UNAVAILABLE);
2475 cancel_stream_cb_args args = {error, t};
2476 grpc_chttp2_stream_map_for_each(&t->stream_map, cancel_stream_cb, &args);
2477 GRPC_ERROR_UNREF(error);
2480 /*******************************************************************************
2481 * INPUT PROCESSING - PARSING
2485 static void WithUrgency(grpc_chttp2_transport* t,
2486 grpc_core::chttp2::FlowControlAction::Urgency urgency,
2487 grpc_chttp2_initiate_write_reason reason, F action) {
2489 case grpc_core::chttp2::FlowControlAction::Urgency::NO_ACTION_NEEDED:
2491 case grpc_core::chttp2::FlowControlAction::Urgency::UPDATE_IMMEDIATELY:
2492 grpc_chttp2_initiate_write(t, reason);
2494 case grpc_core::chttp2::FlowControlAction::Urgency::QUEUE_UPDATE:
2500 void grpc_chttp2_act_on_flowctl_action(
2501 const grpc_core::chttp2::FlowControlAction& action,
2502 grpc_chttp2_transport* t, grpc_chttp2_stream* s) {
2503 WithUrgency(t, action.send_stream_update(),
2504 GRPC_CHTTP2_INITIATE_WRITE_STREAM_FLOW_CONTROL,
2505 [t, s]() { grpc_chttp2_mark_stream_writable(t, s); });
2506 WithUrgency(t, action.send_transport_update(),
2507 GRPC_CHTTP2_INITIATE_WRITE_TRANSPORT_FLOW_CONTROL, []() {});
2508 WithUrgency(t, action.send_initial_window_update(),
2509 GRPC_CHTTP2_INITIATE_WRITE_SEND_SETTINGS, [t, &action]() {
2510 queue_setting_update(t,
2511 GRPC_CHTTP2_SETTINGS_INITIAL_WINDOW_SIZE,
2512 action.initial_window_size());
2514 WithUrgency(t, action.send_max_frame_size_update(),
2515 GRPC_CHTTP2_INITIATE_WRITE_SEND_SETTINGS, [t, &action]() {
2516 queue_setting_update(t, GRPC_CHTTP2_SETTINGS_MAX_FRAME_SIZE,
2517 action.max_frame_size());
2521 static grpc_error* try_http_parsing(grpc_chttp2_transport* t) {
2522 grpc_http_parser parser;
2524 grpc_error* error = GRPC_ERROR_NONE;
2525 grpc_http_response response;
2527 grpc_http_parser_init(&parser, GRPC_HTTP_RESPONSE, &response);
2529 grpc_error* parse_error = GRPC_ERROR_NONE;
2530 for (; i < t->read_buffer.count && parse_error == GRPC_ERROR_NONE; i++) {
2532 grpc_http_parser_parse(&parser, t->read_buffer.slices[i], nullptr);
2534 if (parse_error == GRPC_ERROR_NONE &&
2535 (parse_error = grpc_http_parser_eof(&parser)) == GRPC_ERROR_NONE) {
2536 error = grpc_error_set_int(
2537 grpc_error_set_int(GRPC_ERROR_CREATE_FROM_STATIC_STRING(
2538 "Trying to connect an http1.x server"),
2539 GRPC_ERROR_INT_HTTP_STATUS, response.status),
2540 GRPC_ERROR_INT_GRPC_STATUS, GRPC_STATUS_UNAVAILABLE);
2542 GRPC_ERROR_UNREF(parse_error);
2544 grpc_http_parser_destroy(&parser);
2545 grpc_http_response_destroy(&response);
2549 static void read_action_locked(void* tp, grpc_error* error) {
2550 GPR_TIMER_SCOPE("reading_action_locked", 0);
2552 grpc_chttp2_transport* t = static_cast<grpc_chttp2_transport*>(tp);
2554 GRPC_ERROR_REF(error);
2556 grpc_error* err = error;
2557 if (err != GRPC_ERROR_NONE) {
2558 err = grpc_error_set_int(GRPC_ERROR_CREATE_REFERENCING_FROM_STATIC_STRING(
2559 "Endpoint read failed", &err, 1),
2560 GRPC_ERROR_INT_OCCURRED_DURING_WRITE,
2563 GPR_SWAP(grpc_error*, err, error);
2564 GRPC_ERROR_UNREF(err);
2565 if (t->closed_with_error == GRPC_ERROR_NONE) {
2566 GPR_TIMER_SCOPE("reading_action.parse", 0);
2568 grpc_error* errors[3] = {GRPC_ERROR_REF(error), GRPC_ERROR_NONE,
2570 for (; i < t->read_buffer.count && errors[1] == GRPC_ERROR_NONE; i++) {
2571 grpc_core::BdpEstimator* bdp_est = t->flow_control->bdp_estimator();
2573 bdp_est->AddIncomingBytes(
2574 static_cast<int64_t> GRPC_SLICE_LENGTH(t->read_buffer.slices[i]));
2576 errors[1] = grpc_chttp2_perform_read(t, t->read_buffer.slices[i]);
2578 if (errors[1] != GRPC_ERROR_NONE) {
2579 errors[2] = try_http_parsing(t);
2580 GRPC_ERROR_UNREF(error);
2581 error = GRPC_ERROR_CREATE_REFERENCING_FROM_STATIC_STRING(
2582 "Failed parsing HTTP/2", errors, GPR_ARRAY_SIZE(errors));
2584 for (i = 0; i < GPR_ARRAY_SIZE(errors); i++) {
2585 GRPC_ERROR_UNREF(errors[i]);
2588 GPR_TIMER_SCOPE("post_parse_locked", 0);
2589 if (t->initial_window_update != 0) {
2590 if (t->initial_window_update > 0) {
2591 grpc_chttp2_stream* s;
2592 while (grpc_chttp2_list_pop_stalled_by_stream(t, &s)) {
2593 grpc_chttp2_mark_stream_writable(t, s);
2594 grpc_chttp2_initiate_write(
2595 t, GRPC_CHTTP2_INITIATE_WRITE_FLOW_CONTROL_UNSTALLED_BY_SETTING);
2598 t->initial_window_update = 0;
2602 GPR_TIMER_SCOPE("post_reading_action_locked", 0);
2603 bool keep_reading = false;
2604 if (error == GRPC_ERROR_NONE && t->closed_with_error != GRPC_ERROR_NONE) {
2605 error = GRPC_ERROR_CREATE_REFERENCING_FROM_STATIC_STRING(
2606 "Transport closed", &t->closed_with_error, 1);
2608 if (error != GRPC_ERROR_NONE) {
2609 /* If a goaway frame was received, this might be the reason why the read
2610 * failed. Add this info to the error */
2611 if (t->goaway_error != GRPC_ERROR_NONE) {
2612 error = grpc_error_add_child(error, GRPC_ERROR_REF(t->goaway_error));
2615 close_transport_locked(t, GRPC_ERROR_REF(error));
2616 t->endpoint_reading = 0;
2617 } else if (t->closed_with_error == GRPC_ERROR_NONE) {
2618 keep_reading = true;
2619 /* Since we have read a byte, reset the keepalive timer */
2620 if (t->keepalive_state == GRPC_CHTTP2_KEEPALIVE_STATE_WAITING) {
2621 grpc_timer_cancel(&t->keepalive_ping_timer);
2624 grpc_slice_buffer_reset_and_unref_internal(&t->read_buffer);
2627 if (t->num_pending_induced_frames >= DEFAULT_MAX_PENDING_INDUCED_FRAMES) {
2628 t->reading_paused_on_pending_induced_frames = true;
2629 GRPC_CHTTP2_IF_TRACING(
2631 "transport %p : Pausing reading due to too "
2632 "many unwritten SETTINGS ACK and RST_STREAM frames",
2635 continue_read_action_locked(t);
2638 GRPC_CHTTP2_UNREF_TRANSPORT(t, "reading_action");
2641 GRPC_ERROR_UNREF(error);
2644 static void continue_read_action_locked(grpc_chttp2_transport* t) {
2645 const bool urgent = t->goaway_error != GRPC_ERROR_NONE;
2646 grpc_endpoint_read(t->ep, &t->read_buffer, &t->read_action_locked, urgent);
2647 grpc_chttp2_act_on_flowctl_action(t->flow_control->MakeAction(), t, nullptr);
2650 // t is reffed prior to calling the first time, and once the callback chain
2651 // that kicks off finishes, it's unreffed
2652 static void schedule_bdp_ping_locked(grpc_chttp2_transport* t) {
2653 t->flow_control->bdp_estimator()->SchedulePing();
2654 send_ping_locked(t, &t->start_bdp_ping_locked, &t->finish_bdp_ping_locked);
2657 static void start_bdp_ping_locked(void* tp, grpc_error* error) {
2658 grpc_chttp2_transport* t = static_cast<grpc_chttp2_transport*>(tp);
2659 if (GRPC_TRACE_FLAG_ENABLED(grpc_http_trace)) {
2660 gpr_log(GPR_INFO, "%s: Start BDP ping err=%s", t->peer_string,
2661 grpc_error_string(error));
2663 if (error != GRPC_ERROR_NONE || t->closed_with_error != GRPC_ERROR_NONE) {
2666 /* Reset the keepalive ping timer */
2667 if (t->keepalive_state == GRPC_CHTTP2_KEEPALIVE_STATE_WAITING) {
2668 grpc_timer_cancel(&t->keepalive_ping_timer);
2670 t->flow_control->bdp_estimator()->StartPing();
2673 static void finish_bdp_ping_locked(void* tp, grpc_error* error) {
2674 grpc_chttp2_transport* t = static_cast<grpc_chttp2_transport*>(tp);
2675 if (GRPC_TRACE_FLAG_ENABLED(grpc_http_trace)) {
2676 gpr_log(GPR_INFO, "%s: Complete BDP ping err=%s", t->peer_string,
2677 grpc_error_string(error));
2679 if (error != GRPC_ERROR_NONE || t->closed_with_error != GRPC_ERROR_NONE) {
2680 GRPC_CHTTP2_UNREF_TRANSPORT(t, "bdp_ping");
2683 grpc_millis next_ping = t->flow_control->bdp_estimator()->CompletePing();
2684 grpc_chttp2_act_on_flowctl_action(t->flow_control->PeriodicUpdate(), t,
2686 GPR_ASSERT(!t->have_next_bdp_ping_timer);
2687 t->have_next_bdp_ping_timer = true;
2688 grpc_timer_init(&t->next_bdp_ping_timer, next_ping,
2689 &t->next_bdp_ping_timer_expired_locked);
2692 static void next_bdp_ping_timer_expired_locked(void* tp, grpc_error* error) {
2693 grpc_chttp2_transport* t = static_cast<grpc_chttp2_transport*>(tp);
2694 GPR_ASSERT(t->have_next_bdp_ping_timer);
2695 t->have_next_bdp_ping_timer = false;
2696 if (error != GRPC_ERROR_NONE) {
2697 GRPC_CHTTP2_UNREF_TRANSPORT(t, "bdp_ping");
2700 schedule_bdp_ping_locked(t);
2703 void grpc_chttp2_config_default_keepalive_args(grpc_channel_args* args,
2707 for (i = 0; i < args->num_args; i++) {
2708 if (0 == strcmp(args->args[i].key, GRPC_ARG_KEEPALIVE_TIME_MS)) {
2709 const int value = grpc_channel_arg_get_integer(
2710 &args->args[i], {is_client ? g_default_client_keepalive_time_ms
2711 : g_default_server_keepalive_time_ms,
2714 g_default_client_keepalive_time_ms = value;
2716 g_default_server_keepalive_time_ms = value;
2719 strcmp(args->args[i].key, GRPC_ARG_KEEPALIVE_TIMEOUT_MS)) {
2720 const int value = grpc_channel_arg_get_integer(
2721 &args->args[i], {is_client ? g_default_client_keepalive_timeout_ms
2722 : g_default_server_keepalive_timeout_ms,
2725 g_default_client_keepalive_timeout_ms = value;
2727 g_default_server_keepalive_timeout_ms = value;
2729 } else if (0 == strcmp(args->args[i].key,
2730 GRPC_ARG_KEEPALIVE_PERMIT_WITHOUT_CALLS)) {
2731 const bool value = static_cast<uint32_t>(grpc_channel_arg_get_integer(
2733 {is_client ? g_default_client_keepalive_permit_without_calls
2734 : g_default_server_keepalive_timeout_ms,
2737 g_default_client_keepalive_permit_without_calls = value;
2739 g_default_server_keepalive_permit_without_calls = value;
2742 strcmp(args->args[i].key, GRPC_ARG_HTTP2_MAX_PING_STRIKES)) {
2743 g_default_max_ping_strikes = grpc_channel_arg_get_integer(
2744 &args->args[i], {g_default_max_ping_strikes, 0, INT_MAX});
2745 } else if (0 == strcmp(args->args[i].key,
2746 GRPC_ARG_HTTP2_MAX_PINGS_WITHOUT_DATA)) {
2747 g_default_max_pings_without_data = grpc_channel_arg_get_integer(
2748 &args->args[i], {g_default_max_pings_without_data, 0, INT_MAX});
2752 GRPC_ARG_HTTP2_MIN_SENT_PING_INTERVAL_WITHOUT_DATA_MS)) {
2753 g_default_min_sent_ping_interval_without_data_ms =
2754 grpc_channel_arg_get_integer(
2756 {g_default_min_sent_ping_interval_without_data_ms, 0, INT_MAX});
2760 GRPC_ARG_HTTP2_MIN_RECV_PING_INTERVAL_WITHOUT_DATA_MS)) {
2761 g_default_min_recv_ping_interval_without_data_ms =
2762 grpc_channel_arg_get_integer(
2764 {g_default_min_recv_ping_interval_without_data_ms, 0, INT_MAX});
2770 static void init_keepalive_ping_locked(void* arg, grpc_error* error) {
2771 grpc_chttp2_transport* t = static_cast<grpc_chttp2_transport*>(arg);
2772 GPR_ASSERT(t->keepalive_state == GRPC_CHTTP2_KEEPALIVE_STATE_WAITING);
2773 if (t->destroying || t->closed_with_error != GRPC_ERROR_NONE) {
2774 t->keepalive_state = GRPC_CHTTP2_KEEPALIVE_STATE_DYING;
2775 } else if (error == GRPC_ERROR_NONE) {
2776 if (t->keepalive_permit_without_calls ||
2777 grpc_chttp2_stream_map_size(&t->stream_map) > 0) {
2778 t->keepalive_state = GRPC_CHTTP2_KEEPALIVE_STATE_PINGING;
2779 GRPC_CHTTP2_REF_TRANSPORT(t, "keepalive ping end");
2780 grpc_timer_init_unset(&t->keepalive_watchdog_timer);
2781 send_keepalive_ping_locked(t);
2782 grpc_chttp2_initiate_write(t, GRPC_CHTTP2_INITIATE_WRITE_KEEPALIVE_PING);
2784 GRPC_CHTTP2_REF_TRANSPORT(t, "init keepalive ping");
2785 grpc_timer_init(&t->keepalive_ping_timer,
2786 grpc_core::ExecCtx::Get()->Now() + t->keepalive_time,
2787 &t->init_keepalive_ping_locked);
2789 } else if (error == GRPC_ERROR_CANCELLED) {
2790 /* The keepalive ping timer may be cancelled by bdp */
2791 GRPC_CHTTP2_REF_TRANSPORT(t, "init keepalive ping");
2792 grpc_timer_init(&t->keepalive_ping_timer,
2793 grpc_core::ExecCtx::Get()->Now() + t->keepalive_time,
2794 &t->init_keepalive_ping_locked);
2796 GRPC_CHTTP2_UNREF_TRANSPORT(t, "init keepalive ping");
2799 static void start_keepalive_ping_locked(void* arg, grpc_error* error) {
2800 grpc_chttp2_transport* t = static_cast<grpc_chttp2_transport*>(arg);
2801 if (error != GRPC_ERROR_NONE) {
2804 if (t->channelz_socket != nullptr) {
2805 t->channelz_socket->RecordKeepaliveSent();
2807 if (GRPC_TRACE_FLAG_ENABLED(grpc_http_trace)) {
2808 gpr_log(GPR_INFO, "%s: Start keepalive ping", t->peer_string);
2810 GRPC_CHTTP2_REF_TRANSPORT(t, "keepalive watchdog");
2811 grpc_timer_init(&t->keepalive_watchdog_timer,
2812 grpc_core::ExecCtx::Get()->Now() + t->keepalive_timeout,
2813 &t->keepalive_watchdog_fired_locked);
2816 static void finish_keepalive_ping_locked(void* arg, grpc_error* error) {
2817 grpc_chttp2_transport* t = static_cast<grpc_chttp2_transport*>(arg);
2818 if (t->keepalive_state == GRPC_CHTTP2_KEEPALIVE_STATE_PINGING) {
2819 if (error == GRPC_ERROR_NONE) {
2820 if (GRPC_TRACE_FLAG_ENABLED(grpc_http_trace)) {
2821 gpr_log(GPR_INFO, "%s: Finish keepalive ping", t->peer_string);
2823 t->keepalive_state = GRPC_CHTTP2_KEEPALIVE_STATE_WAITING;
2824 grpc_timer_cancel(&t->keepalive_watchdog_timer);
2825 GRPC_CHTTP2_REF_TRANSPORT(t, "init keepalive ping");
2826 grpc_timer_init(&t->keepalive_ping_timer,
2827 grpc_core::ExecCtx::Get()->Now() + t->keepalive_time,
2828 &t->init_keepalive_ping_locked);
2831 GRPC_CHTTP2_UNREF_TRANSPORT(t, "keepalive ping end");
2834 static void keepalive_watchdog_fired_locked(void* arg, grpc_error* error) {
2835 grpc_chttp2_transport* t = static_cast<grpc_chttp2_transport*>(arg);
2836 if (t->keepalive_state == GRPC_CHTTP2_KEEPALIVE_STATE_PINGING) {
2837 if (error == GRPC_ERROR_NONE) {
2838 gpr_log(GPR_ERROR, "%s: Keepalive watchdog fired. Closing transport.",
2840 t->keepalive_state = GRPC_CHTTP2_KEEPALIVE_STATE_DYING;
2841 close_transport_locked(
2842 t, grpc_error_set_int(GRPC_ERROR_CREATE_FROM_STATIC_STRING(
2843 "keepalive watchdog timeout"),
2844 GRPC_ERROR_INT_GRPC_STATUS,
2845 GRPC_STATUS_UNAVAILABLE));
2848 /* The watchdog timer should have been cancelled by
2849 * finish_keepalive_ping_locked. */
2850 if (GPR_UNLIKELY(error != GRPC_ERROR_CANCELLED)) {
2851 gpr_log(GPR_ERROR, "keepalive_ping_end state error: %d (expect: %d)",
2852 t->keepalive_state, GRPC_CHTTP2_KEEPALIVE_STATE_PINGING);
2855 GRPC_CHTTP2_UNREF_TRANSPORT(t, "keepalive watchdog");
2858 /*******************************************************************************
2862 static void connectivity_state_set(grpc_chttp2_transport* t,
2863 grpc_connectivity_state state,
2864 const char* reason) {
2865 GRPC_CHTTP2_IF_TRACING(
2866 gpr_log(GPR_INFO, "transport %p set connectivity_state=%d", t, state));
2867 grpc_connectivity_state_set(&t->channel_callback.state_tracker, state,
2871 /*******************************************************************************
2875 static void set_pollset(grpc_transport* gt, grpc_stream* gs,
2876 grpc_pollset* pollset) {
2877 grpc_chttp2_transport* t = reinterpret_cast<grpc_chttp2_transport*>(gt);
2878 grpc_endpoint_add_to_pollset(t->ep, pollset);
2881 static void set_pollset_set(grpc_transport* gt, grpc_stream* gs,
2882 grpc_pollset_set* pollset_set) {
2883 grpc_chttp2_transport* t = reinterpret_cast<grpc_chttp2_transport*>(gt);
2884 grpc_endpoint_add_to_pollset_set(t->ep, pollset_set);
2887 /*******************************************************************************
2891 static void reset_byte_stream(void* arg, grpc_error* error) {
2892 grpc_chttp2_stream* s = static_cast<grpc_chttp2_stream*>(arg);
2893 s->pending_byte_stream = false;
2894 if (error == GRPC_ERROR_NONE) {
2895 grpc_chttp2_maybe_complete_recv_message(s->t, s);
2896 grpc_chttp2_maybe_complete_recv_trailing_metadata(s->t, s);
2898 GPR_ASSERT(error != GRPC_ERROR_NONE);
2899 GRPC_CLOSURE_SCHED(s->on_next, GRPC_ERROR_REF(error));
2900 s->on_next = nullptr;
2901 GRPC_ERROR_UNREF(s->byte_stream_error);
2902 s->byte_stream_error = GRPC_ERROR_NONE;
2903 grpc_chttp2_cancel_stream(s->t, s, GRPC_ERROR_REF(error));
2904 s->byte_stream_error = GRPC_ERROR_REF(error);
2908 namespace grpc_core {
2910 Chttp2IncomingByteStream::Chttp2IncomingByteStream(
2911 grpc_chttp2_transport* transport, grpc_chttp2_stream* stream,
2912 uint32_t frame_size, uint32_t flags)
2913 : ByteStream(frame_size, flags),
2914 transport_(transport),
2917 remaining_bytes_(frame_size) {
2918 GRPC_ERROR_UNREF(stream->byte_stream_error);
2919 stream->byte_stream_error = GRPC_ERROR_NONE;
2922 void Chttp2IncomingByteStream::OrphanLocked(void* arg,
2923 grpc_error* error_ignored) {
2924 Chttp2IncomingByteStream* bs = static_cast<Chttp2IncomingByteStream*>(arg);
2925 grpc_chttp2_stream* s = bs->stream_;
2926 grpc_chttp2_transport* t = s->t;
2928 s->pending_byte_stream = false;
2929 grpc_chttp2_maybe_complete_recv_message(t, s);
2930 grpc_chttp2_maybe_complete_recv_trailing_metadata(t, s);
2933 void Chttp2IncomingByteStream::Orphan() {
2934 GPR_TIMER_SCOPE("incoming_byte_stream_destroy", 0);
2936 GRPC_CLOSURE_INIT(&destroy_action_,
2937 &Chttp2IncomingByteStream::OrphanLocked, this,
2938 grpc_combiner_scheduler(transport_->combiner)),
2942 void Chttp2IncomingByteStream::NextLocked(void* arg,
2943 grpc_error* error_ignored) {
2944 Chttp2IncomingByteStream* bs = static_cast<Chttp2IncomingByteStream*>(arg);
2945 grpc_chttp2_transport* t = bs->transport_;
2946 grpc_chttp2_stream* s = bs->stream_;
2947 size_t cur_length = s->frame_storage.length;
2948 if (!s->read_closed) {
2949 s->flow_control->IncomingByteStreamUpdate(bs->next_action_.max_size_hint,
2951 grpc_chttp2_act_on_flowctl_action(s->flow_control->MakeAction(), t, s);
2953 GPR_ASSERT(s->unprocessed_incoming_frames_buffer.length == 0);
2954 if (s->frame_storage.length > 0) {
2955 grpc_slice_buffer_swap(&s->frame_storage,
2956 &s->unprocessed_incoming_frames_buffer);
2957 s->unprocessed_incoming_frames_decompressed = false;
2958 GRPC_CLOSURE_SCHED(bs->next_action_.on_complete, GRPC_ERROR_NONE);
2959 } else if (s->byte_stream_error != GRPC_ERROR_NONE) {
2960 GRPC_CLOSURE_SCHED(bs->next_action_.on_complete,
2961 GRPC_ERROR_REF(s->byte_stream_error));
2962 if (s->data_parser.parsing_frame != nullptr) {
2963 s->data_parser.parsing_frame->Unref();
2964 s->data_parser.parsing_frame = nullptr;
2966 } else if (s->read_closed) {
2967 if (bs->remaining_bytes_ != 0) {
2968 s->byte_stream_error =
2969 GRPC_ERROR_CREATE_FROM_STATIC_STRING("Truncated message");
2970 GRPC_CLOSURE_SCHED(bs->next_action_.on_complete,
2971 GRPC_ERROR_REF(s->byte_stream_error));
2972 if (s->data_parser.parsing_frame != nullptr) {
2973 s->data_parser.parsing_frame->Unref();
2974 s->data_parser.parsing_frame = nullptr;
2977 /* Should never reach here. */
2981 s->on_next = bs->next_action_.on_complete;
2986 bool Chttp2IncomingByteStream::Next(size_t max_size_hint,
2987 grpc_closure* on_complete) {
2988 GPR_TIMER_SCOPE("incoming_byte_stream_next", 0);
2989 if (stream_->unprocessed_incoming_frames_buffer.length > 0) {
2993 next_action_.max_size_hint = max_size_hint;
2994 next_action_.on_complete = on_complete;
2996 GRPC_CLOSURE_INIT(&next_action_.closure,
2997 &Chttp2IncomingByteStream::NextLocked, this,
2998 grpc_combiner_scheduler(transport_->combiner)),
3004 void Chttp2IncomingByteStream::MaybeCreateStreamDecompressionCtx() {
3005 GPR_DEBUG_ASSERT(stream_->stream_decompression_method !=
3006 GRPC_STREAM_COMPRESSION_IDENTITY_DECOMPRESS);
3007 if (!stream_->stream_decompression_ctx) {
3008 stream_->stream_decompression_ctx = grpc_stream_compression_context_create(
3009 stream_->stream_decompression_method);
3013 grpc_error* Chttp2IncomingByteStream::Pull(grpc_slice* slice) {
3014 GPR_TIMER_SCOPE("incoming_byte_stream_pull", 0);
3016 if (stream_->unprocessed_incoming_frames_buffer.length > 0) {
3017 if (!stream_->unprocessed_incoming_frames_decompressed &&
3018 stream_->stream_decompression_method !=
3019 GRPC_STREAM_COMPRESSION_IDENTITY_DECOMPRESS) {
3020 bool end_of_context;
3021 MaybeCreateStreamDecompressionCtx();
3022 if (!grpc_stream_decompress(stream_->stream_decompression_ctx,
3023 &stream_->unprocessed_incoming_frames_buffer,
3024 &stream_->decompressed_data_buffer, nullptr,
3025 MAX_SIZE_T, &end_of_context)) {
3027 GRPC_ERROR_CREATE_FROM_STATIC_STRING("Stream decompression error.");
3030 GPR_ASSERT(stream_->unprocessed_incoming_frames_buffer.length == 0);
3031 grpc_slice_buffer_swap(&stream_->unprocessed_incoming_frames_buffer,
3032 &stream_->decompressed_data_buffer);
3033 stream_->unprocessed_incoming_frames_decompressed = true;
3034 if (end_of_context) {
3035 grpc_stream_compression_context_destroy(
3036 stream_->stream_decompression_ctx);
3037 stream_->stream_decompression_ctx = nullptr;
3039 if (stream_->unprocessed_incoming_frames_buffer.length == 0) {
3040 *slice = grpc_empty_slice();
3043 error = grpc_deframe_unprocessed_incoming_frames(
3044 &stream_->data_parser, stream_,
3045 &stream_->unprocessed_incoming_frames_buffer, slice, nullptr);
3046 if (error != GRPC_ERROR_NONE) {
3050 error = GRPC_ERROR_CREATE_FROM_STATIC_STRING("Truncated message");
3051 GRPC_CLOSURE_SCHED(&stream_->reset_byte_stream, GRPC_ERROR_REF(error));
3054 return GRPC_ERROR_NONE;
3057 void Chttp2IncomingByteStream::PublishError(grpc_error* error) {
3058 GPR_ASSERT(error != GRPC_ERROR_NONE);
3059 GRPC_CLOSURE_SCHED(stream_->on_next, GRPC_ERROR_REF(error));
3060 stream_->on_next = nullptr;
3061 GRPC_ERROR_UNREF(stream_->byte_stream_error);
3062 stream_->byte_stream_error = GRPC_ERROR_REF(error);
3063 grpc_chttp2_cancel_stream(transport_, stream_, GRPC_ERROR_REF(error));
3066 grpc_error* Chttp2IncomingByteStream::Push(const grpc_slice& slice,
3067 grpc_slice* slice_out) {
3068 if (remaining_bytes_ < GRPC_SLICE_LENGTH(slice)) {
3070 GRPC_ERROR_CREATE_FROM_STATIC_STRING("Too many bytes in stream");
3071 GRPC_CLOSURE_SCHED(&stream_->reset_byte_stream, GRPC_ERROR_REF(error));
3072 grpc_slice_unref_internal(slice);
3075 remaining_bytes_ -= static_cast<uint32_t> GRPC_SLICE_LENGTH(slice);
3076 if (slice_out != nullptr) {
3079 return GRPC_ERROR_NONE;
3083 grpc_error* Chttp2IncomingByteStream::Finished(grpc_error* error,
3084 bool reset_on_error) {
3085 if (error == GRPC_ERROR_NONE) {
3086 if (remaining_bytes_ != 0) {
3087 error = GRPC_ERROR_CREATE_FROM_STATIC_STRING("Truncated message");
3090 if (error != GRPC_ERROR_NONE && reset_on_error) {
3091 GRPC_CLOSURE_SCHED(&stream_->reset_byte_stream, GRPC_ERROR_REF(error));
3097 void Chttp2IncomingByteStream::Shutdown(grpc_error* error) {
3098 GRPC_ERROR_UNREF(Finished(error, true /* reset_on_error */));
3101 } // namespace grpc_core
3103 /*******************************************************************************
3107 static void post_benign_reclaimer(grpc_chttp2_transport* t) {
3108 if (!t->benign_reclaimer_registered) {
3109 t->benign_reclaimer_registered = true;
3110 GRPC_CHTTP2_REF_TRANSPORT(t, "benign_reclaimer");
3111 grpc_resource_user_post_reclaimer(grpc_endpoint_get_resource_user(t->ep),
3112 false, &t->benign_reclaimer_locked);
3116 static void post_destructive_reclaimer(grpc_chttp2_transport* t) {
3117 if (!t->destructive_reclaimer_registered) {
3118 t->destructive_reclaimer_registered = true;
3119 GRPC_CHTTP2_REF_TRANSPORT(t, "destructive_reclaimer");
3120 grpc_resource_user_post_reclaimer(grpc_endpoint_get_resource_user(t->ep),
3121 true, &t->destructive_reclaimer_locked);
3125 static void benign_reclaimer_locked(void* arg, grpc_error* error) {
3126 grpc_chttp2_transport* t = static_cast<grpc_chttp2_transport*>(arg);
3127 if (error == GRPC_ERROR_NONE &&
3128 grpc_chttp2_stream_map_size(&t->stream_map) == 0) {
3129 /* Channel with no active streams: send a goaway to try and make it
3130 * disconnect cleanly */
3131 if (GRPC_TRACE_FLAG_ENABLED(grpc_resource_quota_trace)) {
3132 gpr_log(GPR_INFO, "HTTP2: %s - send goaway to free memory",
3137 GRPC_ERROR_CREATE_FROM_STATIC_STRING("Buffers full"),
3138 GRPC_ERROR_INT_HTTP2_ERROR, GRPC_HTTP2_ENHANCE_YOUR_CALM));
3139 } else if (error == GRPC_ERROR_NONE &&
3140 GRPC_TRACE_FLAG_ENABLED(grpc_resource_quota_trace)) {
3142 "HTTP2: %s - skip benign reclamation, there are still %" PRIdPTR
3144 t->peer_string, grpc_chttp2_stream_map_size(&t->stream_map));
3146 t->benign_reclaimer_registered = false;
3147 if (error != GRPC_ERROR_CANCELLED) {
3148 grpc_resource_user_finish_reclamation(
3149 grpc_endpoint_get_resource_user(t->ep));
3151 GRPC_CHTTP2_UNREF_TRANSPORT(t, "benign_reclaimer");
3154 static void destructive_reclaimer_locked(void* arg, grpc_error* error) {
3155 grpc_chttp2_transport* t = static_cast<grpc_chttp2_transport*>(arg);
3156 size_t n = grpc_chttp2_stream_map_size(&t->stream_map);
3157 t->destructive_reclaimer_registered = false;
3158 if (error == GRPC_ERROR_NONE && n > 0) {
3159 grpc_chttp2_stream* s = static_cast<grpc_chttp2_stream*>(
3160 grpc_chttp2_stream_map_rand(&t->stream_map));
3161 if (GRPC_TRACE_FLAG_ENABLED(grpc_resource_quota_trace)) {
3162 gpr_log(GPR_INFO, "HTTP2: %s - abandon stream id %d", t->peer_string,
3165 grpc_chttp2_cancel_stream(
3167 grpc_error_set_int(GRPC_ERROR_CREATE_FROM_STATIC_STRING("Buffers full"),
3168 GRPC_ERROR_INT_HTTP2_ERROR,
3169 GRPC_HTTP2_ENHANCE_YOUR_CALM));
3171 /* Since we cancel one stream per destructive reclamation, if
3172 there are more streams left, we can immediately post a new
3173 reclaimer in case the resource quota needs to free more
3175 post_destructive_reclaimer(t);
3178 if (error != GRPC_ERROR_CANCELLED) {
3179 grpc_resource_user_finish_reclamation(
3180 grpc_endpoint_get_resource_user(t->ep));
3182 GRPC_CHTTP2_UNREF_TRANSPORT(t, "destructive_reclaimer");
3185 /*******************************************************************************
3189 const char* grpc_chttp2_initiate_write_reason_string(
3190 grpc_chttp2_initiate_write_reason reason) {
3192 case GRPC_CHTTP2_INITIATE_WRITE_INITIAL_WRITE:
3193 return "INITIAL_WRITE";
3194 case GRPC_CHTTP2_INITIATE_WRITE_START_NEW_STREAM:
3195 return "START_NEW_STREAM";
3196 case GRPC_CHTTP2_INITIATE_WRITE_SEND_MESSAGE:
3197 return "SEND_MESSAGE";
3198 case GRPC_CHTTP2_INITIATE_WRITE_SEND_INITIAL_METADATA:
3199 return "SEND_INITIAL_METADATA";
3200 case GRPC_CHTTP2_INITIATE_WRITE_SEND_TRAILING_METADATA:
3201 return "SEND_TRAILING_METADATA";
3202 case GRPC_CHTTP2_INITIATE_WRITE_RETRY_SEND_PING:
3203 return "RETRY_SEND_PING";
3204 case GRPC_CHTTP2_INITIATE_WRITE_CONTINUE_PINGS:
3205 return "CONTINUE_PINGS";
3206 case GRPC_CHTTP2_INITIATE_WRITE_GOAWAY_SENT:
3207 return "GOAWAY_SENT";
3208 case GRPC_CHTTP2_INITIATE_WRITE_RST_STREAM:
3209 return "RST_STREAM";
3210 case GRPC_CHTTP2_INITIATE_WRITE_CLOSE_FROM_API:
3211 return "CLOSE_FROM_API";
3212 case GRPC_CHTTP2_INITIATE_WRITE_STREAM_FLOW_CONTROL:
3213 return "STREAM_FLOW_CONTROL";
3214 case GRPC_CHTTP2_INITIATE_WRITE_TRANSPORT_FLOW_CONTROL:
3215 return "TRANSPORT_FLOW_CONTROL";
3216 case GRPC_CHTTP2_INITIATE_WRITE_SEND_SETTINGS:
3217 return "SEND_SETTINGS";
3218 case GRPC_CHTTP2_INITIATE_WRITE_FLOW_CONTROL_UNSTALLED_BY_SETTING:
3219 return "FLOW_CONTROL_UNSTALLED_BY_SETTING";
3220 case GRPC_CHTTP2_INITIATE_WRITE_FLOW_CONTROL_UNSTALLED_BY_UPDATE:
3221 return "FLOW_CONTROL_UNSTALLED_BY_UPDATE";
3222 case GRPC_CHTTP2_INITIATE_WRITE_APPLICATION_PING:
3223 return "APPLICATION_PING";
3224 case GRPC_CHTTP2_INITIATE_WRITE_KEEPALIVE_PING:
3225 return "KEEPALIVE_PING";
3226 case GRPC_CHTTP2_INITIATE_WRITE_TRANSPORT_FLOW_CONTROL_UNSTALLED:
3227 return "TRANSPORT_FLOW_CONTROL_UNSTALLED";
3228 case GRPC_CHTTP2_INITIATE_WRITE_PING_RESPONSE:
3229 return "PING_RESPONSE";
3230 case GRPC_CHTTP2_INITIATE_WRITE_FORCE_RST_STREAM:
3231 return "FORCE_RST_STREAM";
3233 GPR_UNREACHABLE_CODE(return "unknown");
3236 static grpc_endpoint* chttp2_get_endpoint(grpc_transport* t) {
3237 return (reinterpret_cast<grpc_chttp2_transport*>(t))->ep;
3240 static const grpc_transport_vtable vtable = {sizeof(grpc_chttp2_stream),
3246 perform_transport_op,
3249 chttp2_get_endpoint};
3251 static const grpc_transport_vtable* get_vtable(void) { return &vtable; }
3253 grpc_core::RefCountedPtr<grpc_core::channelz::SocketNode>
3254 grpc_chttp2_transport_get_socket_node(grpc_transport* transport) {
3255 grpc_chttp2_transport* t =
3256 reinterpret_cast<grpc_chttp2_transport*>(transport);
3257 return t->channelz_socket;
3260 grpc_transport* grpc_create_chttp2_transport(
3261 const grpc_channel_args* channel_args, grpc_endpoint* ep, bool is_client,
3262 grpc_resource_user* resource_user) {
3263 auto t = grpc_core::New<grpc_chttp2_transport>(channel_args, ep, is_client,
3268 void grpc_chttp2_transport_start_reading(
3269 grpc_transport* transport, grpc_slice_buffer* read_buffer,
3270 grpc_closure* notify_on_receive_settings) {
3271 grpc_chttp2_transport* t =
3272 reinterpret_cast<grpc_chttp2_transport*>(transport);
3273 GRPC_CHTTP2_REF_TRANSPORT(
3274 t, "reading_action"); /* matches unref inside reading_action */
3275 if (read_buffer != nullptr) {
3276 grpc_slice_buffer_move_into(read_buffer, &t->read_buffer);
3277 gpr_free(read_buffer);
3279 t->notify_on_receive_settings = notify_on_receive_settings;
3280 GRPC_CLOSURE_SCHED(&t->read_action_locked, GRPC_ERROR_NONE);