Built motion from commit 6a09e18b.|2.6.11
[motion2.git] / legacy-libs / grpc / deps / grpc / src / core / ext / transport / chttp2 / transport / chttp2_transport.cc
1 /*
2  *
3  * Copyright 2018 gRPC authors.
4  *
5  * Licensed under the Apache License, Version 2.0 (the "License");
6  * you may not use this file except in compliance with the License.
7  * You may obtain a copy of the License at
8  *
9  *     http://www.apache.org/licenses/LICENSE-2.0
10  *
11  * Unless required by applicable law or agreed to in writing, software
12  * distributed under the License is distributed on an "AS IS" BASIS,
13  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14  * See the License for the specific language governing permissions and
15  * limitations under the License.
16  *
17  */
18
19 #include <grpc/support/port_platform.h>
20
21 #include "src/core/ext/transport/chttp2/transport/chttp2_transport.h"
22
23 #include <inttypes.h>
24 #include <limits.h>
25 #include <math.h>
26 #include <stdio.h>
27 #include <string.h>
28
29 #include <grpc/slice_buffer.h>
30 #include <grpc/support/alloc.h>
31 #include <grpc/support/log.h>
32 #include <grpc/support/string_util.h>
33
34 #include "src/core/ext/transport/chttp2/transport/context_list.h"
35 #include "src/core/ext/transport/chttp2/transport/frame_data.h"
36 #include "src/core/ext/transport/chttp2/transport/internal.h"
37 #include "src/core/ext/transport/chttp2/transport/varint.h"
38 #include "src/core/lib/channel/channel_args.h"
39 #include "src/core/lib/compression/stream_compression.h"
40 #include "src/core/lib/debug/stats.h"
41 #include "src/core/lib/gpr/env.h"
42 #include "src/core/lib/gpr/string.h"
43 #include "src/core/lib/gprpp/memory.h"
44 #include "src/core/lib/http/parser.h"
45 #include "src/core/lib/iomgr/executor.h"
46 #include "src/core/lib/iomgr/iomgr.h"
47 #include "src/core/lib/iomgr/timer.h"
48 #include "src/core/lib/profiling/timers.h"
49 #include "src/core/lib/slice/slice_internal.h"
50 #include "src/core/lib/slice/slice_string_helpers.h"
51 #include "src/core/lib/transport/error_utils.h"
52 #include "src/core/lib/transport/http2_errors.h"
53 #include "src/core/lib/transport/static_metadata.h"
54 #include "src/core/lib/transport/status_conversion.h"
55 #include "src/core/lib/transport/timeout_encoding.h"
56 #include "src/core/lib/transport/transport.h"
57 #include "src/core/lib/transport/transport_impl.h"
58 #include "src/core/lib/uri/uri_parser.h"
59
60 #define DEFAULT_CONNECTION_WINDOW_TARGET (1024 * 1024)
61 #define MAX_WINDOW 0x7fffffffu
62 #define MAX_WRITE_BUFFER_SIZE (64 * 1024 * 1024)
63 #define DEFAULT_MAX_HEADER_LIST_SIZE (8 * 1024)
64
65 #define DEFAULT_CLIENT_KEEPALIVE_TIME_MS INT_MAX
66 #define DEFAULT_CLIENT_KEEPALIVE_TIMEOUT_MS 20000 /* 20 seconds */
67 #define DEFAULT_SERVER_KEEPALIVE_TIME_MS 7200000  /* 2 hours */
68 #define DEFAULT_SERVER_KEEPALIVE_TIMEOUT_MS 20000 /* 20 seconds */
69 #define DEFAULT_KEEPALIVE_PERMIT_WITHOUT_CALLS false
70 #define KEEPALIVE_TIME_BACKOFF_MULTIPLIER 2
71
72 #define DEFAULT_MIN_SENT_PING_INTERVAL_WITHOUT_DATA_MS 300000 /* 5 minutes */
73 #define DEFAULT_MIN_RECV_PING_INTERVAL_WITHOUT_DATA_MS 300000 /* 5 minutes */
74 #define DEFAULT_MAX_PINGS_BETWEEN_DATA 2
75 #define DEFAULT_MAX_PING_STRIKES 2
76
77 #define DEFAULT_MAX_PENDING_INDUCED_FRAMES 10000
78
79 static int g_default_client_keepalive_time_ms =
80     DEFAULT_CLIENT_KEEPALIVE_TIME_MS;
81 static int g_default_client_keepalive_timeout_ms =
82     DEFAULT_CLIENT_KEEPALIVE_TIMEOUT_MS;
83 static int g_default_server_keepalive_time_ms =
84     DEFAULT_SERVER_KEEPALIVE_TIME_MS;
85 static int g_default_server_keepalive_timeout_ms =
86     DEFAULT_SERVER_KEEPALIVE_TIMEOUT_MS;
87 static bool g_default_client_keepalive_permit_without_calls =
88     DEFAULT_KEEPALIVE_PERMIT_WITHOUT_CALLS;
89 static bool g_default_server_keepalive_permit_without_calls =
90     DEFAULT_KEEPALIVE_PERMIT_WITHOUT_CALLS;
91
92 static int g_default_min_sent_ping_interval_without_data_ms =
93     DEFAULT_MIN_SENT_PING_INTERVAL_WITHOUT_DATA_MS;
94 static int g_default_min_recv_ping_interval_without_data_ms =
95     DEFAULT_MIN_RECV_PING_INTERVAL_WITHOUT_DATA_MS;
96 static int g_default_max_pings_without_data = DEFAULT_MAX_PINGS_BETWEEN_DATA;
97 static int g_default_max_ping_strikes = DEFAULT_MAX_PING_STRIKES;
98
99 #define MAX_CLIENT_STREAM_ID 0x7fffffffu
100 grpc_core::TraceFlag grpc_http_trace(false, "http");
101 grpc_core::DebugOnlyTraceFlag grpc_trace_chttp2_refcount(false,
102                                                          "chttp2_refcount");
103
104 /* forward declarations of various callbacks that we'll build closures around */
105 static void write_action_begin_locked(void* t, grpc_error* error);
106 static void write_action(void* t, grpc_error* error);
107 static void write_action_end_locked(void* t, grpc_error* error);
108
109 static void read_action_locked(void* t, grpc_error* error);
110 static void continue_read_action_locked(grpc_chttp2_transport* t);
111
112 static void complete_fetch_locked(void* gs, grpc_error* error);
113 /** Set a transport level setting, and push it to our peer */
114 static void queue_setting_update(grpc_chttp2_transport* t,
115                                  grpc_chttp2_setting_id id, uint32_t value);
116
117 static void close_from_api(grpc_chttp2_transport* t, grpc_chttp2_stream* s,
118                            grpc_error* error);
119
120 /** Start new streams that have been created if we can */
121 static void maybe_start_some_streams(grpc_chttp2_transport* t);
122
123 static void connectivity_state_set(grpc_chttp2_transport* t,
124                                    grpc_connectivity_state state,
125                                    const char* reason);
126
127 static void benign_reclaimer_locked(void* t, grpc_error* error);
128 static void destructive_reclaimer_locked(void* t, grpc_error* error);
129
130 static void post_benign_reclaimer(grpc_chttp2_transport* t);
131 static void post_destructive_reclaimer(grpc_chttp2_transport* t);
132
133 static void close_transport_locked(grpc_chttp2_transport* t, grpc_error* error);
134 static void end_all_the_calls(grpc_chttp2_transport* t, grpc_error* error);
135
136 static void schedule_bdp_ping_locked(grpc_chttp2_transport* t);
137 static void start_bdp_ping_locked(void* tp, grpc_error* error);
138 static void finish_bdp_ping_locked(void* tp, grpc_error* error);
139 static void next_bdp_ping_timer_expired_locked(void* tp, grpc_error* error);
140
141 static void cancel_pings(grpc_chttp2_transport* t, grpc_error* error);
142 static void send_ping_locked(grpc_chttp2_transport* t,
143                              grpc_closure* on_initiate,
144                              grpc_closure* on_complete);
145 static void retry_initiate_ping_locked(void* tp, grpc_error* error);
146
147 /** keepalive-relevant functions */
148 static void init_keepalive_ping_locked(void* arg, grpc_error* error);
149 static void start_keepalive_ping_locked(void* arg, grpc_error* error);
150 static void finish_keepalive_ping_locked(void* arg, grpc_error* error);
151 static void keepalive_watchdog_fired_locked(void* arg, grpc_error* error);
152
153 static void reset_byte_stream(void* arg, grpc_error* error);
154
155 // Flow control default enabled. Can be disabled by setting
156 // GRPC_EXPERIMENTAL_DISABLE_FLOW_CONTROL
157 bool g_flow_control_enabled = true;
158
159 /*******************************************************************************
160  * CONSTRUCTION/DESTRUCTION/REFCOUNTING
161  */
162
163 grpc_chttp2_transport::~grpc_chttp2_transport() {
164   size_t i;
165
166   if (channelz_socket != nullptr) {
167     channelz_socket.reset();
168   }
169
170   grpc_endpoint_destroy(ep);
171
172   grpc_slice_buffer_destroy_internal(&qbuf);
173
174   grpc_slice_buffer_destroy_internal(&outbuf);
175   grpc_chttp2_hpack_compressor_destroy(&hpack_compressor);
176
177   grpc_error* error =
178       GRPC_ERROR_CREATE_FROM_STATIC_STRING("Transport destroyed");
179   // ContextList::Execute follows semantics of a callback function and does not
180   // take a ref on error
181   grpc_core::ContextList::Execute(cl, nullptr, error);
182   GRPC_ERROR_UNREF(error);
183   cl = nullptr;
184
185   grpc_slice_buffer_destroy_internal(&read_buffer);
186   grpc_chttp2_hpack_parser_destroy(&hpack_parser);
187   grpc_chttp2_goaway_parser_destroy(&goaway_parser);
188
189   for (i = 0; i < STREAM_LIST_COUNT; i++) {
190     GPR_ASSERT(lists[i].head == nullptr);
191     GPR_ASSERT(lists[i].tail == nullptr);
192   }
193
194   GRPC_ERROR_UNREF(goaway_error);
195
196   GPR_ASSERT(grpc_chttp2_stream_map_size(&stream_map) == 0);
197
198   grpc_chttp2_stream_map_destroy(&stream_map);
199   grpc_connectivity_state_destroy(&channel_callback.state_tracker);
200
201   GRPC_COMBINER_UNREF(combiner, "chttp2_transport");
202
203   cancel_pings(this,
204                GRPC_ERROR_CREATE_FROM_STATIC_STRING("Transport destroyed"));
205
206   while (write_cb_pool) {
207     grpc_chttp2_write_cb* next = write_cb_pool->next;
208     gpr_free(write_cb_pool);
209     write_cb_pool = next;
210   }
211
212   flow_control.Destroy();
213
214   GRPC_ERROR_UNREF(closed_with_error);
215   gpr_free(ping_acks);
216   gpr_free(peer_string);
217 }
218
219 static const grpc_transport_vtable* get_vtable(void);
220
221 /* Returns whether bdp is enabled */
222 static bool read_channel_args(grpc_chttp2_transport* t,
223                               const grpc_channel_args* channel_args,
224                               bool is_client) {
225   bool enable_bdp = true;
226   bool channelz_enabled = GRPC_ENABLE_CHANNELZ_DEFAULT;
227   size_t i;
228   int j;
229
230   for (i = 0; i < channel_args->num_args; i++) {
231     if (0 == strcmp(channel_args->args[i].key,
232                     GRPC_ARG_HTTP2_INITIAL_SEQUENCE_NUMBER)) {
233       const grpc_integer_options options = {-1, 0, INT_MAX};
234       const int value =
235           grpc_channel_arg_get_integer(&channel_args->args[i], options);
236       if (value >= 0) {
237         if ((t->next_stream_id & 1) != (value & 1)) {
238           gpr_log(GPR_ERROR, "%s: low bit must be %d on %s",
239                   GRPC_ARG_HTTP2_INITIAL_SEQUENCE_NUMBER, t->next_stream_id & 1,
240                   is_client ? "client" : "server");
241         } else {
242           t->next_stream_id = static_cast<uint32_t>(value);
243         }
244       }
245     } else if (0 == strcmp(channel_args->args[i].key,
246                            GRPC_ARG_HTTP2_HPACK_TABLE_SIZE_ENCODER)) {
247       const grpc_integer_options options = {-1, 0, INT_MAX};
248       const int value =
249           grpc_channel_arg_get_integer(&channel_args->args[i], options);
250       if (value >= 0) {
251         grpc_chttp2_hpack_compressor_set_max_usable_size(
252             &t->hpack_compressor, static_cast<uint32_t>(value));
253       }
254     } else if (0 == strcmp(channel_args->args[i].key,
255                            GRPC_ARG_HTTP2_MAX_PINGS_WITHOUT_DATA)) {
256       t->ping_policy.max_pings_without_data = grpc_channel_arg_get_integer(
257           &channel_args->args[i],
258           {g_default_max_pings_without_data, 0, INT_MAX});
259     } else if (0 == strcmp(channel_args->args[i].key,
260                            GRPC_ARG_HTTP2_MAX_PING_STRIKES)) {
261       t->ping_policy.max_ping_strikes = grpc_channel_arg_get_integer(
262           &channel_args->args[i], {g_default_max_ping_strikes, 0, INT_MAX});
263     } else if (0 ==
264                strcmp(channel_args->args[i].key,
265                       GRPC_ARG_HTTP2_MIN_SENT_PING_INTERVAL_WITHOUT_DATA_MS)) {
266       t->ping_policy.min_sent_ping_interval_without_data =
267           grpc_channel_arg_get_integer(
268               &channel_args->args[i],
269               grpc_integer_options{
270                   g_default_min_sent_ping_interval_without_data_ms, 0,
271                   INT_MAX});
272     } else if (0 ==
273                strcmp(channel_args->args[i].key,
274                       GRPC_ARG_HTTP2_MIN_RECV_PING_INTERVAL_WITHOUT_DATA_MS)) {
275       t->ping_policy.min_recv_ping_interval_without_data =
276           grpc_channel_arg_get_integer(
277               &channel_args->args[i],
278               grpc_integer_options{
279                   g_default_min_recv_ping_interval_without_data_ms, 0,
280                   INT_MAX});
281     } else if (0 == strcmp(channel_args->args[i].key,
282                            GRPC_ARG_HTTP2_WRITE_BUFFER_SIZE)) {
283       t->write_buffer_size = static_cast<uint32_t>(grpc_channel_arg_get_integer(
284           &channel_args->args[i], {0, 0, MAX_WRITE_BUFFER_SIZE}));
285     } else if (0 ==
286                strcmp(channel_args->args[i].key, GRPC_ARG_HTTP2_BDP_PROBE)) {
287       enable_bdp = grpc_channel_arg_get_bool(&channel_args->args[i], true);
288     } else if (0 ==
289                strcmp(channel_args->args[i].key, GRPC_ARG_KEEPALIVE_TIME_MS)) {
290       const int value = grpc_channel_arg_get_integer(
291           &channel_args->args[i],
292           grpc_integer_options{t->is_client
293                                    ? g_default_client_keepalive_time_ms
294                                    : g_default_server_keepalive_time_ms,
295                                1, INT_MAX});
296       t->keepalive_time = value == INT_MAX ? GRPC_MILLIS_INF_FUTURE : value;
297     } else if (0 == strcmp(channel_args->args[i].key,
298                            GRPC_ARG_KEEPALIVE_TIMEOUT_MS)) {
299       const int value = grpc_channel_arg_get_integer(
300           &channel_args->args[i],
301           grpc_integer_options{t->is_client
302                                    ? g_default_client_keepalive_timeout_ms
303                                    : g_default_server_keepalive_timeout_ms,
304                                0, INT_MAX});
305       t->keepalive_timeout = value == INT_MAX ? GRPC_MILLIS_INF_FUTURE : value;
306     } else if (0 == strcmp(channel_args->args[i].key,
307                            GRPC_ARG_KEEPALIVE_PERMIT_WITHOUT_CALLS)) {
308       t->keepalive_permit_without_calls = static_cast<uint32_t>(
309           grpc_channel_arg_get_integer(&channel_args->args[i], {0, 0, 1}));
310     } else if (0 == strcmp(channel_args->args[i].key,
311                            GRPC_ARG_OPTIMIZATION_TARGET)) {
312       if (channel_args->args[i].type != GRPC_ARG_STRING) {
313         gpr_log(GPR_ERROR, "%s should be a string",
314                 GRPC_ARG_OPTIMIZATION_TARGET);
315       } else if (0 == strcmp(channel_args->args[i].value.string, "blend")) {
316         t->opt_target = GRPC_CHTTP2_OPTIMIZE_FOR_LATENCY;
317       } else if (0 == strcmp(channel_args->args[i].value.string, "latency")) {
318         t->opt_target = GRPC_CHTTP2_OPTIMIZE_FOR_LATENCY;
319       } else if (0 ==
320                  strcmp(channel_args->args[i].value.string, "throughput")) {
321         t->opt_target = GRPC_CHTTP2_OPTIMIZE_FOR_THROUGHPUT;
322       } else {
323         gpr_log(GPR_ERROR, "%s value '%s' unknown, assuming 'blend'",
324                 GRPC_ARG_OPTIMIZATION_TARGET,
325                 channel_args->args[i].value.string);
326       }
327     } else if (0 ==
328                strcmp(channel_args->args[i].key, GRPC_ARG_ENABLE_CHANNELZ)) {
329       channelz_enabled = grpc_channel_arg_get_bool(
330           &channel_args->args[i], GRPC_ENABLE_CHANNELZ_DEFAULT);
331     } else {
332       static const struct {
333         const char* channel_arg_name;
334         grpc_chttp2_setting_id setting_id;
335         grpc_integer_options integer_options;
336         bool availability[2] /* server, client */;
337       } settings_map[] = {{GRPC_ARG_MAX_CONCURRENT_STREAMS,
338                            GRPC_CHTTP2_SETTINGS_MAX_CONCURRENT_STREAMS,
339                            {-1, 0, INT32_MAX},
340                            {true, false}},
341                           {GRPC_ARG_HTTP2_HPACK_TABLE_SIZE_DECODER,
342                            GRPC_CHTTP2_SETTINGS_HEADER_TABLE_SIZE,
343                            {-1, 0, INT32_MAX},
344                            {true, true}},
345                           {GRPC_ARG_MAX_METADATA_SIZE,
346                            GRPC_CHTTP2_SETTINGS_MAX_HEADER_LIST_SIZE,
347                            {-1, 0, INT32_MAX},
348                            {true, true}},
349                           {GRPC_ARG_HTTP2_MAX_FRAME_SIZE,
350                            GRPC_CHTTP2_SETTINGS_MAX_FRAME_SIZE,
351                            {-1, 16384, 16777215},
352                            {true, true}},
353                           {GRPC_ARG_HTTP2_ENABLE_TRUE_BINARY,
354                            GRPC_CHTTP2_SETTINGS_GRPC_ALLOW_TRUE_BINARY_METADATA,
355                            {1, 0, 1},
356                            {true, true}},
357                           {GRPC_ARG_HTTP2_STREAM_LOOKAHEAD_BYTES,
358                            GRPC_CHTTP2_SETTINGS_INITIAL_WINDOW_SIZE,
359                            {-1, 5, INT32_MAX},
360                            {true, true}}};
361       for (j = 0; j < static_cast<int> GPR_ARRAY_SIZE(settings_map); j++) {
362         if (0 == strcmp(channel_args->args[i].key,
363                         settings_map[j].channel_arg_name)) {
364           if (!settings_map[j].availability[is_client]) {
365             gpr_log(GPR_DEBUG, "%s is not available on %s",
366                     settings_map[j].channel_arg_name,
367                     is_client ? "clients" : "servers");
368           } else {
369             int value = grpc_channel_arg_get_integer(
370                 &channel_args->args[i], settings_map[j].integer_options);
371             if (value >= 0) {
372               queue_setting_update(t, settings_map[j].setting_id,
373                                    static_cast<uint32_t>(value));
374             }
375           }
376           break;
377         }
378       }
379     }
380   }
381   if (channelz_enabled) {
382     // TODO(ncteisen): add an API to endpoint to query for local addr, and pass
383     // it in here, so SocketNode knows its own address.
384     char* socket_name = nullptr;
385     gpr_asprintf(&socket_name, "%s %s", get_vtable()->name, t->peer_string);
386     t->channelz_socket =
387         grpc_core::MakeRefCounted<grpc_core::channelz::SocketNode>(
388             grpc_core::UniquePtr<char>(),
389             grpc_core::UniquePtr<char>(gpr_strdup(t->peer_string)),
390             grpc_core::UniquePtr<char>(socket_name));
391   }
392   return enable_bdp;
393 }
394
395 static void init_transport_closures(grpc_chttp2_transport* t) {
396   GRPC_CLOSURE_INIT(&t->read_action_locked, read_action_locked, t,
397                     grpc_combiner_scheduler(t->combiner));
398   GRPC_CLOSURE_INIT(&t->benign_reclaimer_locked, benign_reclaimer_locked, t,
399                     grpc_combiner_scheduler(t->combiner));
400   GRPC_CLOSURE_INIT(&t->destructive_reclaimer_locked,
401                     destructive_reclaimer_locked, t,
402                     grpc_combiner_scheduler(t->combiner));
403   GRPC_CLOSURE_INIT(&t->retry_initiate_ping_locked, retry_initiate_ping_locked,
404                     t, grpc_combiner_scheduler(t->combiner));
405   GRPC_CLOSURE_INIT(&t->start_bdp_ping_locked, start_bdp_ping_locked, t,
406                     grpc_combiner_scheduler(t->combiner));
407   GRPC_CLOSURE_INIT(&t->finish_bdp_ping_locked, finish_bdp_ping_locked, t,
408                     grpc_combiner_scheduler(t->combiner));
409   GRPC_CLOSURE_INIT(&t->next_bdp_ping_timer_expired_locked,
410                     next_bdp_ping_timer_expired_locked, t,
411                     grpc_combiner_scheduler(t->combiner));
412   GRPC_CLOSURE_INIT(&t->init_keepalive_ping_locked, init_keepalive_ping_locked,
413                     t, grpc_combiner_scheduler(t->combiner));
414   GRPC_CLOSURE_INIT(&t->start_keepalive_ping_locked,
415                     start_keepalive_ping_locked, t,
416                     grpc_combiner_scheduler(t->combiner));
417   GRPC_CLOSURE_INIT(&t->finish_keepalive_ping_locked,
418                     finish_keepalive_ping_locked, t,
419                     grpc_combiner_scheduler(t->combiner));
420   GRPC_CLOSURE_INIT(&t->keepalive_watchdog_fired_locked,
421                     keepalive_watchdog_fired_locked, t,
422                     grpc_combiner_scheduler(t->combiner));
423 }
424
425 static void init_transport_keepalive_settings(grpc_chttp2_transport* t) {
426   if (t->is_client) {
427     t->keepalive_time = g_default_client_keepalive_time_ms == INT_MAX
428                             ? GRPC_MILLIS_INF_FUTURE
429                             : g_default_client_keepalive_time_ms;
430     t->keepalive_timeout = g_default_client_keepalive_timeout_ms == INT_MAX
431                                ? GRPC_MILLIS_INF_FUTURE
432                                : g_default_client_keepalive_timeout_ms;
433     t->keepalive_permit_without_calls =
434         g_default_client_keepalive_permit_without_calls;
435   } else {
436     t->keepalive_time = g_default_server_keepalive_time_ms == INT_MAX
437                             ? GRPC_MILLIS_INF_FUTURE
438                             : g_default_server_keepalive_time_ms;
439     t->keepalive_timeout = g_default_server_keepalive_timeout_ms == INT_MAX
440                                ? GRPC_MILLIS_INF_FUTURE
441                                : g_default_server_keepalive_timeout_ms;
442     t->keepalive_permit_without_calls =
443         g_default_server_keepalive_permit_without_calls;
444   }
445 }
446
447 static void configure_transport_ping_policy(grpc_chttp2_transport* t) {
448   t->ping_policy.max_pings_without_data = g_default_max_pings_without_data;
449   t->ping_policy.min_sent_ping_interval_without_data =
450       g_default_min_sent_ping_interval_without_data_ms;
451   t->ping_policy.max_ping_strikes = g_default_max_ping_strikes;
452   t->ping_policy.min_recv_ping_interval_without_data =
453       g_default_min_recv_ping_interval_without_data_ms;
454 }
455
456 static void init_keepalive_pings_if_enabled(grpc_chttp2_transport* t) {
457   if (t->keepalive_time != GRPC_MILLIS_INF_FUTURE) {
458     t->keepalive_state = GRPC_CHTTP2_KEEPALIVE_STATE_WAITING;
459     GRPC_CHTTP2_REF_TRANSPORT(t, "init keepalive ping");
460     grpc_timer_init(&t->keepalive_ping_timer,
461                     grpc_core::ExecCtx::Get()->Now() + t->keepalive_time,
462                     &t->init_keepalive_ping_locked);
463   } else {
464     /* Use GRPC_CHTTP2_KEEPALIVE_STATE_DISABLED to indicate there are no
465        inflight keeaplive timers */
466     t->keepalive_state = GRPC_CHTTP2_KEEPALIVE_STATE_DISABLED;
467   }
468 }
469
470 grpc_chttp2_transport::grpc_chttp2_transport(
471     const grpc_channel_args* channel_args, grpc_endpoint* ep, bool is_client,
472     grpc_resource_user* resource_user)
473     : refs(1, &grpc_trace_chttp2_refcount),
474       ep(ep),
475       peer_string(grpc_endpoint_get_peer(ep)),
476       resource_user(resource_user),
477       combiner(grpc_combiner_create()),
478       is_client(is_client),
479       next_stream_id(is_client ? 1 : 2),
480       deframe_state(is_client ? GRPC_DTS_FH_0 : GRPC_DTS_CLIENT_PREFIX_0) {
481   GPR_ASSERT(strlen(GRPC_CHTTP2_CLIENT_CONNECT_STRING) ==
482              GRPC_CHTTP2_CLIENT_CONNECT_STRLEN);
483   base.vtable = get_vtable();
484   /* 8 is a random stab in the dark as to a good initial size: it's small enough
485      that it shouldn't waste memory for infrequently used connections, yet
486      large enough that the exponential growth should happen nicely when it's
487      needed.
488      TODO(ctiller): tune this */
489   grpc_chttp2_stream_map_init(&stream_map, 8);
490
491   grpc_slice_buffer_init(&read_buffer);
492   grpc_connectivity_state_init(
493       &channel_callback.state_tracker, GRPC_CHANNEL_READY,
494       is_client ? "client_transport" : "server_transport");
495   grpc_slice_buffer_init(&outbuf);
496   if (is_client) {
497     grpc_slice_buffer_add(&outbuf, grpc_slice_from_copied_string(
498                                        GRPC_CHTTP2_CLIENT_CONNECT_STRING));
499   }
500   grpc_chttp2_hpack_compressor_init(&hpack_compressor);
501   grpc_slice_buffer_init(&qbuf);
502   /* copy in initial settings to all setting sets */
503   size_t i;
504   int j;
505   for (i = 0; i < GRPC_CHTTP2_NUM_SETTINGS; i++) {
506     for (j = 0; j < GRPC_NUM_SETTING_SETS; j++) {
507       settings[j][i] = grpc_chttp2_settings_parameters[i].default_value;
508     }
509   }
510   grpc_chttp2_hpack_parser_init(&hpack_parser);
511   grpc_chttp2_goaway_parser_init(&goaway_parser);
512
513   init_transport_closures(this);
514
515   /* configure http2 the way we like it */
516   if (is_client) {
517     queue_setting_update(this, GRPC_CHTTP2_SETTINGS_ENABLE_PUSH, 0);
518     queue_setting_update(this, GRPC_CHTTP2_SETTINGS_MAX_CONCURRENT_STREAMS, 0);
519   }
520   queue_setting_update(this, GRPC_CHTTP2_SETTINGS_MAX_HEADER_LIST_SIZE,
521                        DEFAULT_MAX_HEADER_LIST_SIZE);
522   queue_setting_update(this,
523                        GRPC_CHTTP2_SETTINGS_GRPC_ALLOW_TRUE_BINARY_METADATA, 1);
524
525   configure_transport_ping_policy(this);
526   init_transport_keepalive_settings(this);
527
528   bool enable_bdp = true;
529   if (channel_args) {
530     enable_bdp = read_channel_args(this, channel_args, is_client);
531   }
532
533   if (g_flow_control_enabled) {
534     flow_control.Init<grpc_core::chttp2::TransportFlowControl>(this,
535                                                                enable_bdp);
536   } else {
537     flow_control.Init<grpc_core::chttp2::TransportFlowControlDisabled>(this);
538     enable_bdp = false;
539   }
540
541   /* No pings allowed before receiving a header or data frame. */
542   ping_state.pings_before_data_required = 0;
543   ping_state.is_delayed_ping_timer_set = false;
544   ping_state.last_ping_sent_time = GRPC_MILLIS_INF_PAST;
545
546   ping_recv_state.last_ping_recv_time = GRPC_MILLIS_INF_PAST;
547   ping_recv_state.ping_strikes = 0;
548
549   init_keepalive_pings_if_enabled(this);
550
551   if (enable_bdp) {
552     GRPC_CHTTP2_REF_TRANSPORT(this, "bdp_ping");
553     schedule_bdp_ping_locked(this);
554     grpc_chttp2_act_on_flowctl_action(flow_control->PeriodicUpdate(), this,
555                                       nullptr);
556   }
557
558   grpc_chttp2_initiate_write(this, GRPC_CHTTP2_INITIATE_WRITE_INITIAL_WRITE);
559   post_benign_reclaimer(this);
560 }
561
562 static void destroy_transport_locked(void* tp, grpc_error* error) {
563   grpc_chttp2_transport* t = static_cast<grpc_chttp2_transport*>(tp);
564   t->destroying = 1;
565   close_transport_locked(
566       t, grpc_error_set_int(
567              GRPC_ERROR_CREATE_FROM_STATIC_STRING("Transport destroyed"),
568              GRPC_ERROR_INT_OCCURRED_DURING_WRITE, t->write_state));
569   // Must be the last line.
570   GRPC_CHTTP2_UNREF_TRANSPORT(t, "destroy");
571 }
572
573 static void destroy_transport(grpc_transport* gt) {
574   grpc_chttp2_transport* t = reinterpret_cast<grpc_chttp2_transport*>(gt);
575   GRPC_CLOSURE_SCHED(GRPC_CLOSURE_CREATE(destroy_transport_locked, t,
576                                          grpc_combiner_scheduler(t->combiner)),
577                      GRPC_ERROR_NONE);
578 }
579
580 static void close_transport_locked(grpc_chttp2_transport* t,
581                                    grpc_error* error) {
582   end_all_the_calls(t, GRPC_ERROR_REF(error));
583   cancel_pings(t, GRPC_ERROR_REF(error));
584   if (t->closed_with_error == GRPC_ERROR_NONE) {
585     if (!grpc_error_has_clear_grpc_status(error)) {
586       error = grpc_error_set_int(error, GRPC_ERROR_INT_GRPC_STATUS,
587                                  GRPC_STATUS_UNAVAILABLE);
588     }
589     if (t->write_state != GRPC_CHTTP2_WRITE_STATE_IDLE) {
590       if (t->close_transport_on_writes_finished == nullptr) {
591         t->close_transport_on_writes_finished =
592             GRPC_ERROR_CREATE_FROM_STATIC_STRING(
593                 "Delayed close due to in-progress write");
594       }
595       t->close_transport_on_writes_finished =
596           grpc_error_add_child(t->close_transport_on_writes_finished, error);
597       return;
598     }
599     GPR_ASSERT(error != GRPC_ERROR_NONE);
600     t->closed_with_error = GRPC_ERROR_REF(error);
601     connectivity_state_set(t, GRPC_CHANNEL_SHUTDOWN, "close_transport");
602     if (t->ping_state.is_delayed_ping_timer_set) {
603       grpc_timer_cancel(&t->ping_state.delayed_ping_timer);
604     }
605     if (t->have_next_bdp_ping_timer) {
606       grpc_timer_cancel(&t->next_bdp_ping_timer);
607     }
608     switch (t->keepalive_state) {
609       case GRPC_CHTTP2_KEEPALIVE_STATE_WAITING:
610         grpc_timer_cancel(&t->keepalive_ping_timer);
611         break;
612       case GRPC_CHTTP2_KEEPALIVE_STATE_PINGING:
613         grpc_timer_cancel(&t->keepalive_ping_timer);
614         grpc_timer_cancel(&t->keepalive_watchdog_timer);
615         break;
616       case GRPC_CHTTP2_KEEPALIVE_STATE_DYING:
617       case GRPC_CHTTP2_KEEPALIVE_STATE_DISABLED:
618         /* keepalive timers are not set in these two states */
619         break;
620     }
621
622     /* flush writable stream list to avoid dangling references */
623     grpc_chttp2_stream* s;
624     while (grpc_chttp2_list_pop_writable_stream(t, &s)) {
625       GRPC_CHTTP2_STREAM_UNREF(s, "chttp2_writing:close");
626     }
627     GPR_ASSERT(t->write_state == GRPC_CHTTP2_WRITE_STATE_IDLE);
628     grpc_endpoint_shutdown(t->ep, GRPC_ERROR_REF(error));
629   }
630   if (t->notify_on_receive_settings != nullptr) {
631     GRPC_CLOSURE_SCHED(t->notify_on_receive_settings, GRPC_ERROR_CANCELLED);
632     t->notify_on_receive_settings = nullptr;
633   }
634   GRPC_ERROR_UNREF(error);
635 }
636
637 #ifndef NDEBUG
638 void grpc_chttp2_stream_ref(grpc_chttp2_stream* s, const char* reason) {
639   grpc_stream_ref(s->refcount, reason);
640 }
641 void grpc_chttp2_stream_unref(grpc_chttp2_stream* s, const char* reason) {
642   grpc_stream_unref(s->refcount, reason);
643 }
644 #else
645 void grpc_chttp2_stream_ref(grpc_chttp2_stream* s) {
646   grpc_stream_ref(s->refcount);
647 }
648 void grpc_chttp2_stream_unref(grpc_chttp2_stream* s) {
649   grpc_stream_unref(s->refcount);
650 }
651 #endif
652
653 grpc_chttp2_stream::Reffer::Reffer(grpc_chttp2_stream* s) {
654   /* We reserve one 'active stream' that's dropped when the stream is
655      read-closed. The others are for Chttp2IncomingByteStreams that are
656      actively reading */
657   GRPC_CHTTP2_STREAM_REF(s, "chttp2");
658   GRPC_CHTTP2_REF_TRANSPORT(s->t, "stream");
659 }
660
661 grpc_chttp2_stream::grpc_chttp2_stream(grpc_chttp2_transport* t,
662                                        grpc_stream_refcount* refcount,
663                                        const void* server_data,
664                                        grpc_core::Arena* arena)
665     : t(t),
666       refcount(refcount),
667       reffer(this),
668       metadata_buffer{grpc_chttp2_incoming_metadata_buffer(arena),
669                       grpc_chttp2_incoming_metadata_buffer(arena)} {
670   if (server_data) {
671     id = static_cast<uint32_t>((uintptr_t)server_data);
672     *t->accepting_stream = this;
673     grpc_chttp2_stream_map_add(&t->stream_map, id, this);
674     post_destructive_reclaimer(t);
675   }
676   if (t->flow_control->flow_control_enabled()) {
677     flow_control.Init<grpc_core::chttp2::StreamFlowControl>(
678         static_cast<grpc_core::chttp2::TransportFlowControl*>(
679             t->flow_control.get()),
680         this);
681   } else {
682     flow_control.Init<grpc_core::chttp2::StreamFlowControlDisabled>();
683   }
684
685   grpc_slice_buffer_init(&frame_storage);
686   grpc_slice_buffer_init(&unprocessed_incoming_frames_buffer);
687   grpc_slice_buffer_init(&flow_controlled_buffer);
688
689   GRPC_CLOSURE_INIT(&complete_fetch_locked, ::complete_fetch_locked, this,
690                     grpc_combiner_scheduler(t->combiner));
691   GRPC_CLOSURE_INIT(&reset_byte_stream, ::reset_byte_stream, this,
692                     grpc_combiner_scheduler(t->combiner));
693 }
694
695 grpc_chttp2_stream::~grpc_chttp2_stream() {
696   if (t->channelz_socket != nullptr) {
697     if ((t->is_client && eos_received) || (!t->is_client && eos_sent)) {
698       t->channelz_socket->RecordStreamSucceeded();
699     } else {
700       t->channelz_socket->RecordStreamFailed();
701     }
702   }
703
704   GPR_ASSERT((write_closed && read_closed) || id == 0);
705   if (id != 0) {
706     GPR_ASSERT(grpc_chttp2_stream_map_find(&t->stream_map, id) == nullptr);
707   }
708
709   grpc_slice_buffer_destroy_internal(&unprocessed_incoming_frames_buffer);
710   grpc_slice_buffer_destroy_internal(&frame_storage);
711   if (stream_compression_method != GRPC_STREAM_COMPRESSION_IDENTITY_COMPRESS) {
712     grpc_slice_buffer_destroy_internal(&compressed_data_buffer);
713   }
714   if (stream_decompression_method !=
715       GRPC_STREAM_COMPRESSION_IDENTITY_DECOMPRESS) {
716     grpc_slice_buffer_destroy_internal(&decompressed_data_buffer);
717   }
718
719   grpc_chttp2_list_remove_stalled_by_transport(t, this);
720   grpc_chttp2_list_remove_stalled_by_stream(t, this);
721
722   for (int i = 0; i < STREAM_LIST_COUNT; i++) {
723     if (GPR_UNLIKELY(included[i])) {
724       gpr_log(GPR_ERROR, "%s stream %d still included in list %d",
725               t->is_client ? "client" : "server", id, i);
726       abort();
727     }
728   }
729
730   GPR_ASSERT(send_initial_metadata_finished == nullptr);
731   GPR_ASSERT(fetching_send_message == nullptr);
732   GPR_ASSERT(send_trailing_metadata_finished == nullptr);
733   GPR_ASSERT(recv_initial_metadata_ready == nullptr);
734   GPR_ASSERT(recv_message_ready == nullptr);
735   GPR_ASSERT(recv_trailing_metadata_finished == nullptr);
736   grpc_slice_buffer_destroy_internal(&flow_controlled_buffer);
737   GRPC_ERROR_UNREF(read_closed_error);
738   GRPC_ERROR_UNREF(write_closed_error);
739   GRPC_ERROR_UNREF(byte_stream_error);
740
741   flow_control.Destroy();
742
743   if (t->resource_user != nullptr) {
744     grpc_resource_user_free(t->resource_user, GRPC_RESOURCE_QUOTA_CALL_SIZE);
745   }
746
747   GRPC_CHTTP2_UNREF_TRANSPORT(t, "stream");
748   GRPC_CLOSURE_SCHED(destroy_stream_arg, GRPC_ERROR_NONE);
749 }
750
751 static int init_stream(grpc_transport* gt, grpc_stream* gs,
752                        grpc_stream_refcount* refcount, const void* server_data,
753                        grpc_core::Arena* arena) {
754   GPR_TIMER_SCOPE("init_stream", 0);
755   grpc_chttp2_transport* t = reinterpret_cast<grpc_chttp2_transport*>(gt);
756   new (gs) grpc_chttp2_stream(t, refcount, server_data, arena);
757   return 0;
758 }
759
760 static void destroy_stream_locked(void* sp, grpc_error* error) {
761   GPR_TIMER_SCOPE("destroy_stream", 0);
762   grpc_chttp2_stream* s = static_cast<grpc_chttp2_stream*>(sp);
763   s->~grpc_chttp2_stream();
764 }
765
766 static void destroy_stream(grpc_transport* gt, grpc_stream* gs,
767                            grpc_closure* then_schedule_closure) {
768   GPR_TIMER_SCOPE("destroy_stream", 0);
769   grpc_chttp2_transport* t = reinterpret_cast<grpc_chttp2_transport*>(gt);
770   grpc_chttp2_stream* s = reinterpret_cast<grpc_chttp2_stream*>(gs);
771   if (s->stream_compression_method !=
772           GRPC_STREAM_COMPRESSION_IDENTITY_COMPRESS &&
773       s->stream_compression_ctx != nullptr) {
774     grpc_stream_compression_context_destroy(s->stream_compression_ctx);
775     s->stream_compression_ctx = nullptr;
776   }
777   if (s->stream_decompression_method !=
778           GRPC_STREAM_COMPRESSION_IDENTITY_DECOMPRESS &&
779       s->stream_decompression_ctx != nullptr) {
780     grpc_stream_compression_context_destroy(s->stream_decompression_ctx);
781     s->stream_decompression_ctx = nullptr;
782   }
783
784   s->destroy_stream_arg = then_schedule_closure;
785   GRPC_CLOSURE_SCHED(
786       GRPC_CLOSURE_INIT(&s->destroy_stream, destroy_stream_locked, s,
787                         grpc_combiner_scheduler(t->combiner)),
788       GRPC_ERROR_NONE);
789 }
790
791 grpc_chttp2_stream* grpc_chttp2_parsing_accept_stream(grpc_chttp2_transport* t,
792                                                       uint32_t id) {
793   if (t->channel_callback.accept_stream == nullptr) {
794     return nullptr;
795   }
796   // Don't accept the stream if memory quota doesn't allow. Note that we should
797   // simply refuse the stream here instead of canceling the stream after it's
798   // accepted since the latter will create the call which costs much memory.
799   if (t->resource_user != nullptr &&
800       !grpc_resource_user_safe_alloc(t->resource_user,
801                                      GRPC_RESOURCE_QUOTA_CALL_SIZE)) {
802     gpr_log(GPR_ERROR, "Memory exhausted, rejecting the stream.");
803     grpc_chttp2_add_rst_stream_to_next_write(t, id, GRPC_HTTP2_REFUSED_STREAM,
804                                              nullptr);
805     grpc_chttp2_initiate_write(t, GRPC_CHTTP2_INITIATE_WRITE_RST_STREAM);
806     return nullptr;
807   }
808   grpc_chttp2_stream* accepting = nullptr;
809   GPR_ASSERT(t->accepting_stream == nullptr);
810   t->accepting_stream = &accepting;
811   t->channel_callback.accept_stream(t->channel_callback.accept_stream_user_data,
812                                     &t->base,
813                                     (void*)static_cast<uintptr_t>(id));
814   t->accepting_stream = nullptr;
815   return accepting;
816 }
817
818 /*******************************************************************************
819  * OUTPUT PROCESSING
820  */
821
822 static const char* write_state_name(grpc_chttp2_write_state st) {
823   switch (st) {
824     case GRPC_CHTTP2_WRITE_STATE_IDLE:
825       return "IDLE";
826     case GRPC_CHTTP2_WRITE_STATE_WRITING:
827       return "WRITING";
828     case GRPC_CHTTP2_WRITE_STATE_WRITING_WITH_MORE:
829       return "WRITING+MORE";
830   }
831   GPR_UNREACHABLE_CODE(return "UNKNOWN");
832 }
833
834 static void set_write_state(grpc_chttp2_transport* t,
835                             grpc_chttp2_write_state st, const char* reason) {
836   GRPC_CHTTP2_IF_TRACING(
837       gpr_log(GPR_INFO, "W:%p %s [%s] state %s -> %s [%s]", t,
838               t->is_client ? "CLIENT" : "SERVER", t->peer_string,
839               write_state_name(t->write_state), write_state_name(st), reason));
840   t->write_state = st;
841   /* If the state is being reset back to idle, it means a write was just
842    * finished. Make sure all the run_after_write closures are scheduled.
843    *
844    * This is also our chance to close the transport if the transport was marked
845    * to be closed after all writes finish (for example, if we received a go-away
846    * from peer while we had some pending writes) */
847   if (st == GRPC_CHTTP2_WRITE_STATE_IDLE) {
848     GRPC_CLOSURE_LIST_SCHED(&t->run_after_write);
849     if (t->close_transport_on_writes_finished != nullptr) {
850       grpc_error* err = t->close_transport_on_writes_finished;
851       t->close_transport_on_writes_finished = nullptr;
852       close_transport_locked(t, err);
853     }
854   }
855 }
856
857 static void inc_initiate_write_reason(
858     grpc_chttp2_initiate_write_reason reason) {
859   switch (reason) {
860     case GRPC_CHTTP2_INITIATE_WRITE_INITIAL_WRITE:
861       GRPC_STATS_INC_HTTP2_INITIATE_WRITE_DUE_TO_INITIAL_WRITE();
862       break;
863     case GRPC_CHTTP2_INITIATE_WRITE_START_NEW_STREAM:
864       GRPC_STATS_INC_HTTP2_INITIATE_WRITE_DUE_TO_START_NEW_STREAM();
865       break;
866     case GRPC_CHTTP2_INITIATE_WRITE_SEND_MESSAGE:
867       GRPC_STATS_INC_HTTP2_INITIATE_WRITE_DUE_TO_SEND_MESSAGE();
868       break;
869     case GRPC_CHTTP2_INITIATE_WRITE_SEND_INITIAL_METADATA:
870       GRPC_STATS_INC_HTTP2_INITIATE_WRITE_DUE_TO_SEND_INITIAL_METADATA();
871       break;
872     case GRPC_CHTTP2_INITIATE_WRITE_SEND_TRAILING_METADATA:
873       GRPC_STATS_INC_HTTP2_INITIATE_WRITE_DUE_TO_SEND_TRAILING_METADATA();
874       break;
875     case GRPC_CHTTP2_INITIATE_WRITE_RETRY_SEND_PING:
876       GRPC_STATS_INC_HTTP2_INITIATE_WRITE_DUE_TO_RETRY_SEND_PING();
877       break;
878     case GRPC_CHTTP2_INITIATE_WRITE_CONTINUE_PINGS:
879       GRPC_STATS_INC_HTTP2_INITIATE_WRITE_DUE_TO_CONTINUE_PINGS();
880       break;
881     case GRPC_CHTTP2_INITIATE_WRITE_GOAWAY_SENT:
882       GRPC_STATS_INC_HTTP2_INITIATE_WRITE_DUE_TO_GOAWAY_SENT();
883       break;
884     case GRPC_CHTTP2_INITIATE_WRITE_RST_STREAM:
885       GRPC_STATS_INC_HTTP2_INITIATE_WRITE_DUE_TO_RST_STREAM();
886       break;
887     case GRPC_CHTTP2_INITIATE_WRITE_CLOSE_FROM_API:
888       GRPC_STATS_INC_HTTP2_INITIATE_WRITE_DUE_TO_CLOSE_FROM_API();
889       break;
890     case GRPC_CHTTP2_INITIATE_WRITE_STREAM_FLOW_CONTROL:
891       GRPC_STATS_INC_HTTP2_INITIATE_WRITE_DUE_TO_STREAM_FLOW_CONTROL();
892       break;
893     case GRPC_CHTTP2_INITIATE_WRITE_TRANSPORT_FLOW_CONTROL:
894       GRPC_STATS_INC_HTTP2_INITIATE_WRITE_DUE_TO_TRANSPORT_FLOW_CONTROL();
895       break;
896     case GRPC_CHTTP2_INITIATE_WRITE_SEND_SETTINGS:
897       GRPC_STATS_INC_HTTP2_INITIATE_WRITE_DUE_TO_SEND_SETTINGS();
898       break;
899     case GRPC_CHTTP2_INITIATE_WRITE_FLOW_CONTROL_UNSTALLED_BY_SETTING:
900       GRPC_STATS_INC_HTTP2_INITIATE_WRITE_DUE_TO_FLOW_CONTROL_UNSTALLED_BY_SETTING();
901       break;
902     case GRPC_CHTTP2_INITIATE_WRITE_FLOW_CONTROL_UNSTALLED_BY_UPDATE:
903       GRPC_STATS_INC_HTTP2_INITIATE_WRITE_DUE_TO_FLOW_CONTROL_UNSTALLED_BY_UPDATE();
904       break;
905     case GRPC_CHTTP2_INITIATE_WRITE_APPLICATION_PING:
906       GRPC_STATS_INC_HTTP2_INITIATE_WRITE_DUE_TO_APPLICATION_PING();
907       break;
908     case GRPC_CHTTP2_INITIATE_WRITE_KEEPALIVE_PING:
909       GRPC_STATS_INC_HTTP2_INITIATE_WRITE_DUE_TO_KEEPALIVE_PING();
910       break;
911     case GRPC_CHTTP2_INITIATE_WRITE_TRANSPORT_FLOW_CONTROL_UNSTALLED:
912       GRPC_STATS_INC_HTTP2_INITIATE_WRITE_DUE_TO_TRANSPORT_FLOW_CONTROL_UNSTALLED();
913       break;
914     case GRPC_CHTTP2_INITIATE_WRITE_PING_RESPONSE:
915       GRPC_STATS_INC_HTTP2_INITIATE_WRITE_DUE_TO_PING_RESPONSE();
916       break;
917     case GRPC_CHTTP2_INITIATE_WRITE_FORCE_RST_STREAM:
918       GRPC_STATS_INC_HTTP2_INITIATE_WRITE_DUE_TO_FORCE_RST_STREAM();
919       break;
920   }
921 }
922
923 void grpc_chttp2_initiate_write(grpc_chttp2_transport* t,
924                                 grpc_chttp2_initiate_write_reason reason) {
925   GPR_TIMER_SCOPE("grpc_chttp2_initiate_write", 0);
926
927   switch (t->write_state) {
928     case GRPC_CHTTP2_WRITE_STATE_IDLE:
929       inc_initiate_write_reason(reason);
930       set_write_state(t, GRPC_CHTTP2_WRITE_STATE_WRITING,
931                       grpc_chttp2_initiate_write_reason_string(reason));
932       t->is_first_write_in_batch = true;
933       GRPC_CHTTP2_REF_TRANSPORT(t, "writing");
934       /* Note that the 'write_action_begin_locked' closure is being scheduled
935        * on the 'finally_scheduler' of t->combiner. This means that
936        * 'write_action_begin_locked' is called only *after* all the other
937        * closures (some of which are potentially initiating more writes on the
938        * transport) are executed on the t->combiner.
939        *
940        * The reason for scheduling on finally_scheduler is to make sure we batch
941        * as many writes as possible. 'write_action_begin_locked' is the function
942        * that gathers all the relevant bytes (which are at various places in the
943        * grpc_chttp2_transport structure) and append them to 'outbuf' field in
944        * grpc_chttp2_transport thereby batching what would have been potentially
945        * multiple write operations.
946        *
947        * Also, 'write_action_begin_locked' only gathers the bytes into outbuf.
948        * It does not call the endpoint to write the bytes. That is done by the
949        * 'write_action' (which is scheduled by 'write_action_begin_locked') */
950       GRPC_CLOSURE_SCHED(
951           GRPC_CLOSURE_INIT(&t->write_action_begin_locked,
952                             write_action_begin_locked, t,
953                             grpc_combiner_finally_scheduler(t->combiner)),
954           GRPC_ERROR_NONE);
955       break;
956     case GRPC_CHTTP2_WRITE_STATE_WRITING:
957       set_write_state(t, GRPC_CHTTP2_WRITE_STATE_WRITING_WITH_MORE,
958                       grpc_chttp2_initiate_write_reason_string(reason));
959       break;
960     case GRPC_CHTTP2_WRITE_STATE_WRITING_WITH_MORE:
961       break;
962   }
963 }
964
965 void grpc_chttp2_mark_stream_writable(grpc_chttp2_transport* t,
966                                       grpc_chttp2_stream* s) {
967   if (t->closed_with_error == GRPC_ERROR_NONE &&
968       grpc_chttp2_list_add_writable_stream(t, s)) {
969     GRPC_CHTTP2_STREAM_REF(s, "chttp2_writing:become");
970   }
971 }
972
973 static grpc_closure_scheduler* write_scheduler(grpc_chttp2_transport* t,
974                                                bool early_results_scheduled,
975                                                bool partial_write) {
976   // If we're already in a background poller, don't offload this to an executor
977   if (grpc_iomgr_is_any_background_poller_thread()) {
978     return grpc_schedule_on_exec_ctx;
979   }
980   /* if it's not the first write in a batch, always offload to the executor:
981      we'll probably end up queuing against the kernel anyway, so we'll likely
982      get better latency overall if we switch writing work elsewhere and continue
983      with application work above */
984   if (!t->is_first_write_in_batch) {
985     return grpc_core::Executor::Scheduler(grpc_core::ExecutorJobType::SHORT);
986   }
987   /* equivalently, if it's a partial write, we *know* we're going to be taking a
988      thread jump to write it because of the above, may as well do so
989      immediately */
990   if (partial_write) {
991     return grpc_core::Executor::Scheduler(grpc_core::ExecutorJobType::SHORT);
992   }
993   switch (t->opt_target) {
994     case GRPC_CHTTP2_OPTIMIZE_FOR_THROUGHPUT:
995       /* executor gives us the largest probability of being able to batch a
996        * write with others on this transport */
997       return grpc_core::Executor::Scheduler(grpc_core::ExecutorJobType::SHORT);
998     case GRPC_CHTTP2_OPTIMIZE_FOR_LATENCY:
999       return grpc_schedule_on_exec_ctx;
1000   }
1001   GPR_UNREACHABLE_CODE(return nullptr);
1002 }
1003
1004 #define WRITE_STATE_TUPLE_TO_INT(p, i) (2 * (int)(p) + (int)(i))
1005 static const char* begin_writing_desc(bool partial, bool inlined) {
1006   switch (WRITE_STATE_TUPLE_TO_INT(partial, inlined)) {
1007     case WRITE_STATE_TUPLE_TO_INT(false, false):
1008       return "begin write in background";
1009     case WRITE_STATE_TUPLE_TO_INT(false, true):
1010       return "begin write in current thread";
1011     case WRITE_STATE_TUPLE_TO_INT(true, false):
1012       return "begin partial write in background";
1013     case WRITE_STATE_TUPLE_TO_INT(true, true):
1014       return "begin partial write in current thread";
1015   }
1016   GPR_UNREACHABLE_CODE(return "bad state tuple");
1017 }
1018
1019 static void write_action_begin_locked(void* gt, grpc_error* error_ignored) {
1020   GPR_TIMER_SCOPE("write_action_begin_locked", 0);
1021   grpc_chttp2_transport* t = static_cast<grpc_chttp2_transport*>(gt);
1022   GPR_ASSERT(t->write_state != GRPC_CHTTP2_WRITE_STATE_IDLE);
1023   grpc_chttp2_begin_write_result r;
1024   if (t->closed_with_error != GRPC_ERROR_NONE) {
1025     r.writing = false;
1026   } else {
1027     r = grpc_chttp2_begin_write(t);
1028   }
1029   if (r.writing) {
1030     if (r.partial) {
1031       GRPC_STATS_INC_HTTP2_PARTIAL_WRITES();
1032     }
1033     if (!t->is_first_write_in_batch) {
1034       GRPC_STATS_INC_HTTP2_WRITES_CONTINUED();
1035     }
1036     grpc_closure_scheduler* scheduler =
1037         write_scheduler(t, r.early_results_scheduled, r.partial);
1038     if (scheduler != grpc_schedule_on_exec_ctx) {
1039       GRPC_STATS_INC_HTTP2_WRITES_OFFLOADED();
1040     }
1041     set_write_state(
1042         t,
1043         r.partial ? GRPC_CHTTP2_WRITE_STATE_WRITING_WITH_MORE
1044                   : GRPC_CHTTP2_WRITE_STATE_WRITING,
1045         begin_writing_desc(r.partial, scheduler == grpc_schedule_on_exec_ctx));
1046     GRPC_CLOSURE_SCHED(
1047         GRPC_CLOSURE_INIT(&t->write_action, write_action, t, scheduler),
1048         GRPC_ERROR_NONE);
1049     if (t->reading_paused_on_pending_induced_frames) {
1050       GPR_ASSERT(t->num_pending_induced_frames == 0);
1051       /* We had paused reading, because we had many induced frames (SETTINGS
1052        * ACK, PINGS ACK and RST_STREAMS) pending in t->qbuf. Now that we have
1053        * been able to flush qbuf, we can resume reading. */
1054       GRPC_CHTTP2_IF_TRACING(gpr_log(
1055           GPR_INFO,
1056           "transport %p : Resuming reading after being paused due to too "
1057           "many unwritten SETTINGS ACK, PINGS ACK and RST_STREAM frames",
1058           t));
1059       t->reading_paused_on_pending_induced_frames = false;
1060       continue_read_action_locked(t);
1061     }
1062   } else {
1063     GRPC_STATS_INC_HTTP2_SPURIOUS_WRITES_BEGUN();
1064     set_write_state(t, GRPC_CHTTP2_WRITE_STATE_IDLE, "begin writing nothing");
1065     GRPC_CHTTP2_UNREF_TRANSPORT(t, "writing");
1066   }
1067 }
1068
1069 static void write_action(void* gt, grpc_error* error) {
1070   GPR_TIMER_SCOPE("write_action", 0);
1071   grpc_chttp2_transport* t = static_cast<grpc_chttp2_transport*>(gt);
1072   void* cl = t->cl;
1073   t->cl = nullptr;
1074   grpc_endpoint_write(
1075       t->ep, &t->outbuf,
1076       GRPC_CLOSURE_INIT(&t->write_action_end_locked, write_action_end_locked, t,
1077                         grpc_combiner_scheduler(t->combiner)),
1078       cl);
1079 }
1080
1081 /* Callback from the grpc_endpoint after bytes have been written by calling
1082  * sendmsg */
1083 static void write_action_end_locked(void* tp, grpc_error* error) {
1084   GPR_TIMER_SCOPE("terminate_writing_with_lock", 0);
1085   grpc_chttp2_transport* t = static_cast<grpc_chttp2_transport*>(tp);
1086
1087   bool closed = false;
1088   if (error != GRPC_ERROR_NONE) {
1089     close_transport_locked(t, GRPC_ERROR_REF(error));
1090     closed = true;
1091   }
1092
1093   if (t->sent_goaway_state == GRPC_CHTTP2_GOAWAY_SEND_SCHEDULED) {
1094     t->sent_goaway_state = GRPC_CHTTP2_GOAWAY_SENT;
1095     closed = true;
1096     if (grpc_chttp2_stream_map_size(&t->stream_map) == 0) {
1097       close_transport_locked(
1098           t, GRPC_ERROR_CREATE_FROM_STATIC_STRING("goaway sent"));
1099     }
1100   }
1101
1102   switch (t->write_state) {
1103     case GRPC_CHTTP2_WRITE_STATE_IDLE:
1104       GPR_UNREACHABLE_CODE(break);
1105     case GRPC_CHTTP2_WRITE_STATE_WRITING:
1106       GPR_TIMER_MARK("state=writing", 0);
1107       set_write_state(t, GRPC_CHTTP2_WRITE_STATE_IDLE, "finish writing");
1108       break;
1109     case GRPC_CHTTP2_WRITE_STATE_WRITING_WITH_MORE:
1110       GPR_TIMER_MARK("state=writing_stale_no_poller", 0);
1111       set_write_state(t, GRPC_CHTTP2_WRITE_STATE_WRITING, "continue writing");
1112       t->is_first_write_in_batch = false;
1113       GRPC_CHTTP2_REF_TRANSPORT(t, "writing");
1114       // If the transport is closed, we will retry writing on the endpoint
1115       // and next write may contain part of the currently serialized frames.
1116       // So, we should only call the run_after_write callbacks when the next
1117       // write finishes, or the callbacks will be invoked when the stream is
1118       // closed.
1119       if (!closed) {
1120         GRPC_CLOSURE_LIST_SCHED(&t->run_after_write);
1121       }
1122       GRPC_CLOSURE_RUN(
1123           GRPC_CLOSURE_INIT(&t->write_action_begin_locked,
1124                             write_action_begin_locked, t,
1125                             grpc_combiner_finally_scheduler(t->combiner)),
1126           GRPC_ERROR_NONE);
1127       break;
1128   }
1129
1130   grpc_chttp2_end_write(t, GRPC_ERROR_REF(error));
1131   GRPC_CHTTP2_UNREF_TRANSPORT(t, "writing");
1132 }
1133
1134 // Dirties an HTTP2 setting to be sent out next time a writing path occurs.
1135 // If the change needs to occur immediately, manually initiate a write.
1136 static void queue_setting_update(grpc_chttp2_transport* t,
1137                                  grpc_chttp2_setting_id id, uint32_t value) {
1138   const grpc_chttp2_setting_parameters* sp =
1139       &grpc_chttp2_settings_parameters[id];
1140   uint32_t use_value = GPR_CLAMP(value, sp->min_value, sp->max_value);
1141   if (use_value != value) {
1142     gpr_log(GPR_INFO, "Requested parameter %s clamped from %d to %d", sp->name,
1143             value, use_value);
1144   }
1145   if (use_value != t->settings[GRPC_LOCAL_SETTINGS][id]) {
1146     t->settings[GRPC_LOCAL_SETTINGS][id] = use_value;
1147     t->dirtied_local_settings = 1;
1148   }
1149 }
1150
1151 void grpc_chttp2_add_incoming_goaway(grpc_chttp2_transport* t,
1152                                      uint32_t goaway_error,
1153                                      uint32_t last_stream_id,
1154                                      const grpc_slice& goaway_text) {
1155   // Discard the error from a previous goaway frame (if any)
1156   if (t->goaway_error != GRPC_ERROR_NONE) {
1157     GRPC_ERROR_UNREF(t->goaway_error);
1158   }
1159   t->goaway_error = grpc_error_set_str(
1160       grpc_error_set_int(
1161           grpc_error_set_int(
1162               GRPC_ERROR_CREATE_FROM_STATIC_STRING("GOAWAY received"),
1163               GRPC_ERROR_INT_HTTP2_ERROR, static_cast<intptr_t>(goaway_error)),
1164           GRPC_ERROR_INT_GRPC_STATUS, GRPC_STATUS_UNAVAILABLE),
1165       GRPC_ERROR_STR_RAW_BYTES, goaway_text);
1166
1167   GRPC_CHTTP2_IF_TRACING(
1168       gpr_log(GPR_INFO, "transport %p got goaway with last stream id %d", t,
1169               last_stream_id));
1170   /* We want to log this irrespective of whether http tracing is enabled if we
1171    * received a GOAWAY with a non NO_ERROR code. */
1172   if (goaway_error != GRPC_HTTP2_NO_ERROR) {
1173     gpr_log(GPR_INFO, "%s: Got goaway [%d] err=%s", t->peer_string,
1174             goaway_error, grpc_error_string(t->goaway_error));
1175   }
1176   /* When a client receives a GOAWAY with error code ENHANCE_YOUR_CALM and debug
1177    * data equal to "too_many_pings", it should log the occurrence at a log level
1178    * that is enabled by default and double the configured KEEPALIVE_TIME used
1179    * for new connections on that channel. */
1180   if (GPR_UNLIKELY(t->is_client &&
1181                    goaway_error == GRPC_HTTP2_ENHANCE_YOUR_CALM &&
1182                    grpc_slice_str_cmp(goaway_text, "too_many_pings") == 0)) {
1183     gpr_log(GPR_ERROR,
1184             "Received a GOAWAY with error code ENHANCE_YOUR_CALM and debug "
1185             "data equal to \"too_many_pings\"");
1186     double current_keepalive_time_ms = static_cast<double>(t->keepalive_time);
1187     t->keepalive_time =
1188         current_keepalive_time_ms > INT_MAX / KEEPALIVE_TIME_BACKOFF_MULTIPLIER
1189             ? GRPC_MILLIS_INF_FUTURE
1190             : static_cast<grpc_millis>(current_keepalive_time_ms *
1191                                        KEEPALIVE_TIME_BACKOFF_MULTIPLIER);
1192   }
1193
1194   /* lie: use transient failure from the transport to indicate goaway has been
1195    * received */
1196   connectivity_state_set(t, GRPC_CHANNEL_TRANSIENT_FAILURE, "got_goaway");
1197 }
1198
1199 static void maybe_start_some_streams(grpc_chttp2_transport* t) {
1200   grpc_chttp2_stream* s;
1201   /* cancel out streams that haven't yet started if we have received a GOAWAY */
1202   if (t->goaway_error != GRPC_ERROR_NONE) {
1203     while (grpc_chttp2_list_pop_waiting_for_concurrency(t, &s)) {
1204       grpc_chttp2_cancel_stream(
1205           t, s,
1206           grpc_error_set_int(
1207               GRPC_ERROR_CREATE_FROM_STATIC_STRING("GOAWAY received"),
1208               GRPC_ERROR_INT_GRPC_STATUS, GRPC_STATUS_UNAVAILABLE));
1209     }
1210     return;
1211   }
1212   /* start streams where we have free grpc_chttp2_stream ids and free
1213    * concurrency */
1214   while (t->next_stream_id <= MAX_CLIENT_STREAM_ID &&
1215          grpc_chttp2_stream_map_size(&t->stream_map) <
1216              t->settings[GRPC_PEER_SETTINGS]
1217                         [GRPC_CHTTP2_SETTINGS_MAX_CONCURRENT_STREAMS] &&
1218          grpc_chttp2_list_pop_waiting_for_concurrency(t, &s)) {
1219     /* safe since we can't (legally) be parsing this stream yet */
1220     GRPC_CHTTP2_IF_TRACING(gpr_log(
1221         GPR_INFO,
1222         "HTTP:%s: Transport %p allocating new grpc_chttp2_stream %p to id %d",
1223         t->is_client ? "CLI" : "SVR", t, s, t->next_stream_id));
1224
1225     GPR_ASSERT(s->id == 0);
1226     s->id = t->next_stream_id;
1227     t->next_stream_id += 2;
1228
1229     if (t->next_stream_id >= MAX_CLIENT_STREAM_ID) {
1230       connectivity_state_set(t, GRPC_CHANNEL_TRANSIENT_FAILURE,
1231                              "no_more_stream_ids");
1232     }
1233
1234     grpc_chttp2_stream_map_add(&t->stream_map, s->id, s);
1235     post_destructive_reclaimer(t);
1236     grpc_chttp2_mark_stream_writable(t, s);
1237     grpc_chttp2_initiate_write(t, GRPC_CHTTP2_INITIATE_WRITE_START_NEW_STREAM);
1238   }
1239   /* cancel out streams that will never be started */
1240   if (t->next_stream_id >= MAX_CLIENT_STREAM_ID) {
1241     while (grpc_chttp2_list_pop_waiting_for_concurrency(t, &s)) {
1242       grpc_chttp2_cancel_stream(
1243           t, s,
1244           grpc_error_set_int(
1245               GRPC_ERROR_CREATE_FROM_STATIC_STRING("Stream IDs exhausted"),
1246               GRPC_ERROR_INT_GRPC_STATUS, GRPC_STATUS_UNAVAILABLE));
1247     }
1248   }
1249 }
1250
1251 /* Flag that this closure barrier may be covering a write in a pollset, and so
1252    we should not complete this closure until we can prove that the write got
1253    scheduled */
1254 #define CLOSURE_BARRIER_MAY_COVER_WRITE (1 << 0)
1255 /* First bit of the reference count, stored in the high order bits (with the low
1256    bits being used for flags defined above) */
1257 #define CLOSURE_BARRIER_FIRST_REF_BIT (1 << 16)
1258
1259 static grpc_closure* add_closure_barrier(grpc_closure* closure) {
1260   closure->next_data.scratch += CLOSURE_BARRIER_FIRST_REF_BIT;
1261   return closure;
1262 }
1263
1264 static void null_then_sched_closure(grpc_closure** closure) {
1265   grpc_closure* c = *closure;
1266   *closure = nullptr;
1267   GRPC_CLOSURE_SCHED(c, GRPC_ERROR_NONE);
1268 }
1269
1270 void grpc_chttp2_complete_closure_step(grpc_chttp2_transport* t,
1271                                        grpc_chttp2_stream* s,
1272                                        grpc_closure** pclosure,
1273                                        grpc_error* error, const char* desc) {
1274   grpc_closure* closure = *pclosure;
1275   *pclosure = nullptr;
1276   if (closure == nullptr) {
1277     GRPC_ERROR_UNREF(error);
1278     return;
1279   }
1280   closure->next_data.scratch -= CLOSURE_BARRIER_FIRST_REF_BIT;
1281   if (GRPC_TRACE_FLAG_ENABLED(grpc_http_trace)) {
1282     const char* errstr = grpc_error_string(error);
1283     gpr_log(
1284         GPR_INFO,
1285         "complete_closure_step: t=%p %p refs=%d flags=0x%04x desc=%s err=%s "
1286         "write_state=%s",
1287         t, closure,
1288         static_cast<int>(closure->next_data.scratch /
1289                          CLOSURE_BARRIER_FIRST_REF_BIT),
1290         static_cast<int>(closure->next_data.scratch %
1291                          CLOSURE_BARRIER_FIRST_REF_BIT),
1292         desc, errstr, write_state_name(t->write_state));
1293   }
1294   if (error != GRPC_ERROR_NONE) {
1295     if (closure->error_data.error == GRPC_ERROR_NONE) {
1296       closure->error_data.error = GRPC_ERROR_CREATE_FROM_STATIC_STRING(
1297           "Error in HTTP transport completing operation");
1298       closure->error_data.error = grpc_error_set_str(
1299           closure->error_data.error, GRPC_ERROR_STR_TARGET_ADDRESS,
1300           grpc_slice_from_copied_string(t->peer_string));
1301     }
1302     closure->error_data.error =
1303         grpc_error_add_child(closure->error_data.error, error);
1304   }
1305   if (closure->next_data.scratch < CLOSURE_BARRIER_FIRST_REF_BIT) {
1306     if ((t->write_state == GRPC_CHTTP2_WRITE_STATE_IDLE) ||
1307         !(closure->next_data.scratch & CLOSURE_BARRIER_MAY_COVER_WRITE)) {
1308       // Using GRPC_CLOSURE_SCHED instead of GRPC_CLOSURE_RUN to avoid running
1309       // closures earlier than when it is safe to do so.
1310       GRPC_CLOSURE_SCHED(closure, closure->error_data.error);
1311     } else {
1312       grpc_closure_list_append(&t->run_after_write, closure,
1313                                closure->error_data.error);
1314     }
1315   }
1316 }
1317
1318 static bool contains_non_ok_status(grpc_metadata_batch* batch) {
1319   if (batch->idx.named.grpc_status != nullptr) {
1320     return !grpc_mdelem_static_value_eq(batch->idx.named.grpc_status->md,
1321                                         GRPC_MDELEM_GRPC_STATUS_0);
1322   }
1323   return false;
1324 }
1325
1326 static void maybe_become_writable_due_to_send_msg(grpc_chttp2_transport* t,
1327                                                   grpc_chttp2_stream* s) {
1328   if (s->id != 0 && (!s->write_buffering ||
1329                      s->flow_controlled_buffer.length > t->write_buffer_size)) {
1330     grpc_chttp2_mark_stream_writable(t, s);
1331     grpc_chttp2_initiate_write(t, GRPC_CHTTP2_INITIATE_WRITE_SEND_MESSAGE);
1332   }
1333 }
1334
1335 static void add_fetched_slice_locked(grpc_chttp2_transport* t,
1336                                      grpc_chttp2_stream* s) {
1337   s->fetched_send_message_length +=
1338       static_cast<uint32_t> GRPC_SLICE_LENGTH(s->fetching_slice);
1339   grpc_slice_buffer_add(&s->flow_controlled_buffer, s->fetching_slice);
1340   maybe_become_writable_due_to_send_msg(t, s);
1341 }
1342
1343 static void continue_fetching_send_locked(grpc_chttp2_transport* t,
1344                                           grpc_chttp2_stream* s) {
1345   for (;;) {
1346     if (s->fetching_send_message == nullptr) {
1347       /* Stream was cancelled before message fetch completed */
1348       abort(); /* TODO(ctiller): what cleanup here? */
1349       return;  /* early out */
1350     }
1351     if (s->fetched_send_message_length == s->fetching_send_message->length()) {
1352       int64_t notify_offset = s->next_message_end_offset;
1353       if (notify_offset <= s->flow_controlled_bytes_written) {
1354         grpc_chttp2_complete_closure_step(
1355             t, s, &s->fetching_send_message_finished, GRPC_ERROR_NONE,
1356             "fetching_send_message_finished");
1357       } else {
1358         grpc_chttp2_write_cb* cb = t->write_cb_pool;
1359         if (cb == nullptr) {
1360           cb = static_cast<grpc_chttp2_write_cb*>(gpr_malloc(sizeof(*cb)));
1361         } else {
1362           t->write_cb_pool = cb->next;
1363         }
1364         cb->call_at_byte = notify_offset;
1365         cb->closure = s->fetching_send_message_finished;
1366         s->fetching_send_message_finished = nullptr;
1367         grpc_chttp2_write_cb** list =
1368             s->fetching_send_message->flags() & GRPC_WRITE_THROUGH
1369                 ? &s->on_write_finished_cbs
1370                 : &s->on_flow_controlled_cbs;
1371         cb->next = *list;
1372         *list = cb;
1373       }
1374       s->fetching_send_message.reset();
1375       return; /* early out */
1376     } else if (s->fetching_send_message->Next(UINT32_MAX,
1377                                               &s->complete_fetch_locked)) {
1378       grpc_error* error = s->fetching_send_message->Pull(&s->fetching_slice);
1379       if (error != GRPC_ERROR_NONE) {
1380         s->fetching_send_message.reset();
1381         grpc_chttp2_cancel_stream(t, s, error);
1382       } else {
1383         add_fetched_slice_locked(t, s);
1384       }
1385     }
1386   }
1387 }
1388
1389 static void complete_fetch_locked(void* gs, grpc_error* error) {
1390   grpc_chttp2_stream* s = static_cast<grpc_chttp2_stream*>(gs);
1391   grpc_chttp2_transport* t = s->t;
1392   if (error == GRPC_ERROR_NONE) {
1393     error = s->fetching_send_message->Pull(&s->fetching_slice);
1394     if (error == GRPC_ERROR_NONE) {
1395       add_fetched_slice_locked(t, s);
1396       continue_fetching_send_locked(t, s);
1397     }
1398   }
1399   if (error != GRPC_ERROR_NONE) {
1400     s->fetching_send_message.reset();
1401     grpc_chttp2_cancel_stream(t, s, error);
1402   }
1403 }
1404
1405 static void log_metadata(const grpc_metadata_batch* md_batch, uint32_t id,
1406                          bool is_client, bool is_initial) {
1407   for (grpc_linked_mdelem* md = md_batch->list.head; md != nullptr;
1408        md = md->next) {
1409     char* key = grpc_slice_to_c_string(GRPC_MDKEY(md->md));
1410     char* value = grpc_slice_to_c_string(GRPC_MDVALUE(md->md));
1411     gpr_log(GPR_INFO, "HTTP:%d:%s:%s: %s: %s", id, is_initial ? "HDR" : "TRL",
1412             is_client ? "CLI" : "SVR", key, value);
1413     gpr_free(key);
1414     gpr_free(value);
1415   }
1416 }
1417
1418 static void perform_stream_op_locked(void* stream_op,
1419                                      grpc_error* error_ignored) {
1420   GPR_TIMER_SCOPE("perform_stream_op_locked", 0);
1421
1422   grpc_transport_stream_op_batch* op =
1423       static_cast<grpc_transport_stream_op_batch*>(stream_op);
1424   grpc_chttp2_stream* s =
1425       static_cast<grpc_chttp2_stream*>(op->handler_private.extra_arg);
1426   grpc_transport_stream_op_batch_payload* op_payload = op->payload;
1427   grpc_chttp2_transport* t = s->t;
1428
1429   GRPC_STATS_INC_HTTP2_OP_BATCHES();
1430
1431   s->context = op->payload->context;
1432   s->traced = op->is_traced;
1433   if (GRPC_TRACE_FLAG_ENABLED(grpc_http_trace)) {
1434     char* str = grpc_transport_stream_op_batch_string(op);
1435     gpr_log(GPR_INFO, "perform_stream_op_locked: %s; on_complete = %p", str,
1436             op->on_complete);
1437     gpr_free(str);
1438     if (op->send_initial_metadata) {
1439       log_metadata(op_payload->send_initial_metadata.send_initial_metadata,
1440                    s->id, t->is_client, true);
1441     }
1442     if (op->send_trailing_metadata) {
1443       log_metadata(op_payload->send_trailing_metadata.send_trailing_metadata,
1444                    s->id, t->is_client, false);
1445     }
1446   }
1447
1448   grpc_closure* on_complete = op->on_complete;
1449   // on_complete will be null if and only if there are no send ops in the batch.
1450   if (on_complete != nullptr) {
1451     // This batch has send ops. Use final_data as a barrier until enqueue time;
1452     // the initial counter is dropped at the end of this function.
1453     on_complete->next_data.scratch = CLOSURE_BARRIER_FIRST_REF_BIT;
1454     on_complete->error_data.error = GRPC_ERROR_NONE;
1455   }
1456
1457   if (op->cancel_stream) {
1458     GRPC_STATS_INC_HTTP2_OP_CANCEL();
1459     grpc_chttp2_cancel_stream(t, s, op_payload->cancel_stream.cancel_error);
1460   }
1461
1462   if (op->send_initial_metadata) {
1463     if (t->is_client && t->channelz_socket != nullptr) {
1464       t->channelz_socket->RecordStreamStartedFromLocal();
1465     }
1466     GRPC_STATS_INC_HTTP2_OP_SEND_INITIAL_METADATA();
1467     GPR_ASSERT(s->send_initial_metadata_finished == nullptr);
1468     on_complete->next_data.scratch |= CLOSURE_BARRIER_MAY_COVER_WRITE;
1469
1470     /* Identify stream compression */
1471     if (op_payload->send_initial_metadata.send_initial_metadata->idx.named
1472                 .content_encoding == nullptr ||
1473         grpc_stream_compression_method_parse(
1474             GRPC_MDVALUE(
1475                 op_payload->send_initial_metadata.send_initial_metadata->idx
1476                     .named.content_encoding->md),
1477             true, &s->stream_compression_method) == 0) {
1478       s->stream_compression_method = GRPC_STREAM_COMPRESSION_IDENTITY_COMPRESS;
1479     }
1480     if (s->stream_compression_method !=
1481         GRPC_STREAM_COMPRESSION_IDENTITY_COMPRESS) {
1482       s->uncompressed_data_size = 0;
1483       s->stream_compression_ctx = nullptr;
1484       grpc_slice_buffer_init(&s->compressed_data_buffer);
1485     }
1486     s->send_initial_metadata_finished = add_closure_barrier(on_complete);
1487     s->send_initial_metadata =
1488         op_payload->send_initial_metadata.send_initial_metadata;
1489     const size_t metadata_size =
1490         grpc_metadata_batch_size(s->send_initial_metadata);
1491     const size_t metadata_peer_limit =
1492         t->settings[GRPC_PEER_SETTINGS]
1493                    [GRPC_CHTTP2_SETTINGS_MAX_HEADER_LIST_SIZE];
1494     if (t->is_client) {
1495       s->deadline = GPR_MIN(s->deadline, s->send_initial_metadata->deadline);
1496     }
1497     if (metadata_size > metadata_peer_limit) {
1498       grpc_chttp2_cancel_stream(
1499           t, s,
1500           grpc_error_set_int(
1501               grpc_error_set_int(
1502                   grpc_error_set_int(GRPC_ERROR_CREATE_FROM_STATIC_STRING(
1503                                          "to-be-sent initial metadata size "
1504                                          "exceeds peer limit"),
1505                                      GRPC_ERROR_INT_SIZE,
1506                                      static_cast<intptr_t>(metadata_size)),
1507                   GRPC_ERROR_INT_LIMIT,
1508                   static_cast<intptr_t>(metadata_peer_limit)),
1509               GRPC_ERROR_INT_GRPC_STATUS, GRPC_STATUS_RESOURCE_EXHAUSTED));
1510     } else {
1511       if (contains_non_ok_status(s->send_initial_metadata)) {
1512         s->seen_error = true;
1513       }
1514       if (!s->write_closed) {
1515         if (t->is_client) {
1516           if (t->closed_with_error == GRPC_ERROR_NONE) {
1517             GPR_ASSERT(s->id == 0);
1518             grpc_chttp2_list_add_waiting_for_concurrency(t, s);
1519             maybe_start_some_streams(t);
1520           } else {
1521             grpc_chttp2_cancel_stream(
1522                 t, s,
1523                 grpc_error_set_int(
1524                     GRPC_ERROR_CREATE_REFERENCING_FROM_STATIC_STRING(
1525                         "Transport closed", &t->closed_with_error, 1),
1526                     GRPC_ERROR_INT_GRPC_STATUS, GRPC_STATUS_UNAVAILABLE));
1527           }
1528         } else {
1529           GPR_ASSERT(s->id != 0);
1530           grpc_chttp2_mark_stream_writable(t, s);
1531           if (!(op->send_message &&
1532                 (op->payload->send_message.send_message->flags() &
1533                  GRPC_WRITE_BUFFER_HINT))) {
1534             grpc_chttp2_initiate_write(
1535                 t, GRPC_CHTTP2_INITIATE_WRITE_SEND_INITIAL_METADATA);
1536           }
1537         }
1538       } else {
1539         s->send_initial_metadata = nullptr;
1540         grpc_chttp2_complete_closure_step(
1541             t, s, &s->send_initial_metadata_finished,
1542             GRPC_ERROR_CREATE_REFERENCING_FROM_STATIC_STRING(
1543                 "Attempt to send initial metadata after stream was closed",
1544                 &s->write_closed_error, 1),
1545             "send_initial_metadata_finished");
1546       }
1547     }
1548     if (op_payload->send_initial_metadata.peer_string != nullptr) {
1549       gpr_atm_rel_store(op_payload->send_initial_metadata.peer_string,
1550                         (gpr_atm)t->peer_string);
1551     }
1552   }
1553
1554   if (op->send_message) {
1555     GRPC_STATS_INC_HTTP2_OP_SEND_MESSAGE();
1556     t->num_messages_in_next_write++;
1557     GRPC_STATS_INC_HTTP2_SEND_MESSAGE_SIZE(
1558         op->payload->send_message.send_message->length());
1559     on_complete->next_data.scratch |= CLOSURE_BARRIER_MAY_COVER_WRITE;
1560     s->fetching_send_message_finished = add_closure_barrier(op->on_complete);
1561     if (s->write_closed) {
1562       op->payload->send_message.stream_write_closed = true;
1563       // We should NOT return an error here, so as to avoid a cancel OP being
1564       // started. The surface layer will notice that the stream has been closed
1565       // for writes and fail the send message op.
1566       op->payload->send_message.send_message.reset();
1567       grpc_chttp2_complete_closure_step(
1568           t, s, &s->fetching_send_message_finished, GRPC_ERROR_NONE,
1569           "fetching_send_message_finished");
1570     } else {
1571       GPR_ASSERT(s->fetching_send_message == nullptr);
1572       uint8_t* frame_hdr = grpc_slice_buffer_tiny_add(
1573           &s->flow_controlled_buffer, GRPC_HEADER_SIZE_IN_BYTES);
1574       uint32_t flags = op_payload->send_message.send_message->flags();
1575       frame_hdr[0] = (flags & GRPC_WRITE_INTERNAL_COMPRESS) != 0;
1576       size_t len = op_payload->send_message.send_message->length();
1577       frame_hdr[1] = static_cast<uint8_t>(len >> 24);
1578       frame_hdr[2] = static_cast<uint8_t>(len >> 16);
1579       frame_hdr[3] = static_cast<uint8_t>(len >> 8);
1580       frame_hdr[4] = static_cast<uint8_t>(len);
1581       s->fetching_send_message =
1582           std::move(op_payload->send_message.send_message);
1583       s->fetched_send_message_length = 0;
1584       s->next_message_end_offset =
1585           s->flow_controlled_bytes_written +
1586           static_cast<int64_t>(s->flow_controlled_buffer.length) +
1587           static_cast<int64_t>(len);
1588       if (flags & GRPC_WRITE_BUFFER_HINT) {
1589         s->next_message_end_offset -= t->write_buffer_size;
1590         s->write_buffering = true;
1591       } else {
1592         s->write_buffering = false;
1593       }
1594       continue_fetching_send_locked(t, s);
1595       maybe_become_writable_due_to_send_msg(t, s);
1596     }
1597   }
1598
1599   if (op->send_trailing_metadata) {
1600     GRPC_STATS_INC_HTTP2_OP_SEND_TRAILING_METADATA();
1601     GPR_ASSERT(s->send_trailing_metadata_finished == nullptr);
1602     on_complete->next_data.scratch |= CLOSURE_BARRIER_MAY_COVER_WRITE;
1603     s->send_trailing_metadata_finished = add_closure_barrier(on_complete);
1604     s->send_trailing_metadata =
1605         op_payload->send_trailing_metadata.send_trailing_metadata;
1606     s->write_buffering = false;
1607     const size_t metadata_size =
1608         grpc_metadata_batch_size(s->send_trailing_metadata);
1609     const size_t metadata_peer_limit =
1610         t->settings[GRPC_PEER_SETTINGS]
1611                    [GRPC_CHTTP2_SETTINGS_MAX_HEADER_LIST_SIZE];
1612     if (metadata_size > metadata_peer_limit) {
1613       grpc_chttp2_cancel_stream(
1614           t, s,
1615           grpc_error_set_int(
1616               grpc_error_set_int(
1617                   grpc_error_set_int(GRPC_ERROR_CREATE_FROM_STATIC_STRING(
1618                                          "to-be-sent trailing metadata size "
1619                                          "exceeds peer limit"),
1620                                      GRPC_ERROR_INT_SIZE,
1621                                      static_cast<intptr_t>(metadata_size)),
1622                   GRPC_ERROR_INT_LIMIT,
1623                   static_cast<intptr_t>(metadata_peer_limit)),
1624               GRPC_ERROR_INT_GRPC_STATUS, GRPC_STATUS_RESOURCE_EXHAUSTED));
1625     } else {
1626       if (contains_non_ok_status(s->send_trailing_metadata)) {
1627         s->seen_error = true;
1628       }
1629       if (s->write_closed) {
1630         s->send_trailing_metadata = nullptr;
1631         grpc_chttp2_complete_closure_step(
1632             t, s, &s->send_trailing_metadata_finished,
1633             grpc_metadata_batch_is_empty(
1634                 op->payload->send_trailing_metadata.send_trailing_metadata)
1635                 ? GRPC_ERROR_NONE
1636                 : GRPC_ERROR_CREATE_FROM_STATIC_STRING(
1637                       "Attempt to send trailing metadata after "
1638                       "stream was closed"),
1639             "send_trailing_metadata_finished");
1640       } else if (s->id != 0) {
1641         /* TODO(ctiller): check if there's flow control for any outstanding
1642            bytes before going writable */
1643         grpc_chttp2_mark_stream_writable(t, s);
1644         grpc_chttp2_initiate_write(
1645             t, GRPC_CHTTP2_INITIATE_WRITE_SEND_TRAILING_METADATA);
1646       }
1647     }
1648   }
1649
1650   if (op->recv_initial_metadata) {
1651     GRPC_STATS_INC_HTTP2_OP_RECV_INITIAL_METADATA();
1652     GPR_ASSERT(s->recv_initial_metadata_ready == nullptr);
1653     s->recv_initial_metadata_ready =
1654         op_payload->recv_initial_metadata.recv_initial_metadata_ready;
1655     s->recv_initial_metadata =
1656         op_payload->recv_initial_metadata.recv_initial_metadata;
1657     s->trailing_metadata_available =
1658         op_payload->recv_initial_metadata.trailing_metadata_available;
1659     if (op_payload->recv_initial_metadata.peer_string != nullptr) {
1660       gpr_atm_rel_store(op_payload->recv_initial_metadata.peer_string,
1661                         (gpr_atm)t->peer_string);
1662     }
1663     grpc_chttp2_maybe_complete_recv_initial_metadata(t, s);
1664   }
1665
1666   if (op->recv_message) {
1667     GRPC_STATS_INC_HTTP2_OP_RECV_MESSAGE();
1668     size_t before = 0;
1669     GPR_ASSERT(s->recv_message_ready == nullptr);
1670     GPR_ASSERT(!s->pending_byte_stream);
1671     s->recv_message_ready = op_payload->recv_message.recv_message_ready;
1672     s->recv_message = op_payload->recv_message.recv_message;
1673     if (s->id != 0) {
1674       if (!s->read_closed) {
1675         before = s->frame_storage.length +
1676                  s->unprocessed_incoming_frames_buffer.length;
1677       }
1678     }
1679     grpc_chttp2_maybe_complete_recv_message(t, s);
1680     if (s->id != 0) {
1681       if (!s->read_closed && s->frame_storage.length == 0) {
1682         size_t after = s->frame_storage.length +
1683                        s->unprocessed_incoming_frames_buffer_cached_length;
1684         s->flow_control->IncomingByteStreamUpdate(GRPC_HEADER_SIZE_IN_BYTES,
1685                                                   before - after);
1686         grpc_chttp2_act_on_flowctl_action(s->flow_control->MakeAction(), t, s);
1687       }
1688     }
1689   }
1690
1691   if (op->recv_trailing_metadata) {
1692     GRPC_STATS_INC_HTTP2_OP_RECV_TRAILING_METADATA();
1693     GPR_ASSERT(s->collecting_stats == nullptr);
1694     s->collecting_stats = op_payload->recv_trailing_metadata.collect_stats;
1695     GPR_ASSERT(s->recv_trailing_metadata_finished == nullptr);
1696     s->recv_trailing_metadata_finished =
1697         op_payload->recv_trailing_metadata.recv_trailing_metadata_ready;
1698     s->recv_trailing_metadata =
1699         op_payload->recv_trailing_metadata.recv_trailing_metadata;
1700     s->final_metadata_requested = true;
1701     grpc_chttp2_maybe_complete_recv_trailing_metadata(t, s);
1702   }
1703
1704   if (on_complete != nullptr) {
1705     grpc_chttp2_complete_closure_step(t, s, &on_complete, GRPC_ERROR_NONE,
1706                                       "op->on_complete");
1707   }
1708
1709   GRPC_CHTTP2_STREAM_UNREF(s, "perform_stream_op");
1710 }
1711
1712 static void perform_stream_op(grpc_transport* gt, grpc_stream* gs,
1713                               grpc_transport_stream_op_batch* op) {
1714   GPR_TIMER_SCOPE("perform_stream_op", 0);
1715   grpc_chttp2_transport* t = reinterpret_cast<grpc_chttp2_transport*>(gt);
1716   grpc_chttp2_stream* s = reinterpret_cast<grpc_chttp2_stream*>(gs);
1717
1718   if (!t->is_client) {
1719     if (op->send_initial_metadata) {
1720       grpc_millis deadline =
1721           op->payload->send_initial_metadata.send_initial_metadata->deadline;
1722       GPR_ASSERT(deadline == GRPC_MILLIS_INF_FUTURE);
1723     }
1724     if (op->send_trailing_metadata) {
1725       grpc_millis deadline =
1726           op->payload->send_trailing_metadata.send_trailing_metadata->deadline;
1727       GPR_ASSERT(deadline == GRPC_MILLIS_INF_FUTURE);
1728     }
1729   }
1730
1731   if (GRPC_TRACE_FLAG_ENABLED(grpc_http_trace)) {
1732     char* str = grpc_transport_stream_op_batch_string(op);
1733     gpr_log(GPR_INFO, "perform_stream_op[s=%p]: %s", s, str);
1734     gpr_free(str);
1735   }
1736
1737   GRPC_CHTTP2_STREAM_REF(s, "perform_stream_op");
1738   op->handler_private.extra_arg = gs;
1739   GRPC_CLOSURE_SCHED(
1740       GRPC_CLOSURE_INIT(&op->handler_private.closure, perform_stream_op_locked,
1741                         op, grpc_combiner_scheduler(t->combiner)),
1742       GRPC_ERROR_NONE);
1743 }
1744
1745 static void cancel_pings(grpc_chttp2_transport* t, grpc_error* error) {
1746   /* callback remaining pings: they're not allowed to call into the transport,
1747      and maybe they hold resources that need to be freed */
1748   grpc_chttp2_ping_queue* pq = &t->ping_queue;
1749   GPR_ASSERT(error != GRPC_ERROR_NONE);
1750   for (size_t j = 0; j < GRPC_CHTTP2_PCL_COUNT; j++) {
1751     grpc_closure_list_fail_all(&pq->lists[j], GRPC_ERROR_REF(error));
1752     GRPC_CLOSURE_LIST_SCHED(&pq->lists[j]);
1753   }
1754   GRPC_ERROR_UNREF(error);
1755 }
1756
1757 static void send_ping_locked(grpc_chttp2_transport* t,
1758                              grpc_closure* on_initiate, grpc_closure* on_ack) {
1759   if (t->closed_with_error != GRPC_ERROR_NONE) {
1760     GRPC_CLOSURE_SCHED(on_initiate, GRPC_ERROR_REF(t->closed_with_error));
1761     GRPC_CLOSURE_SCHED(on_ack, GRPC_ERROR_REF(t->closed_with_error));
1762     return;
1763   }
1764   grpc_chttp2_ping_queue* pq = &t->ping_queue;
1765   grpc_closure_list_append(&pq->lists[GRPC_CHTTP2_PCL_INITIATE], on_initiate,
1766                            GRPC_ERROR_NONE);
1767   grpc_closure_list_append(&pq->lists[GRPC_CHTTP2_PCL_NEXT], on_ack,
1768                            GRPC_ERROR_NONE);
1769 }
1770
1771 /*
1772  * Specialized form of send_ping_locked for keepalive ping. If there is already
1773  * a ping in progress, the keepalive ping would piggyback onto that ping,
1774  * instead of waiting for that ping to complete and then starting a new ping.
1775  */
1776 static void send_keepalive_ping_locked(grpc_chttp2_transport* t) {
1777   if (t->closed_with_error != GRPC_ERROR_NONE) {
1778     GRPC_CLOSURE_RUN(&t->start_keepalive_ping_locked,
1779                      GRPC_ERROR_REF(t->closed_with_error));
1780     GRPC_CLOSURE_RUN(&t->finish_keepalive_ping_locked,
1781                      GRPC_ERROR_REF(t->closed_with_error));
1782     return;
1783   }
1784   grpc_chttp2_ping_queue* pq = &t->ping_queue;
1785   if (!grpc_closure_list_empty(pq->lists[GRPC_CHTTP2_PCL_INFLIGHT])) {
1786     /* There is a ping in flight. Add yourself to the inflight closure list. */
1787     GRPC_CLOSURE_RUN(&t->start_keepalive_ping_locked, GRPC_ERROR_NONE);
1788     grpc_closure_list_append(&pq->lists[GRPC_CHTTP2_PCL_INFLIGHT],
1789                              &t->finish_keepalive_ping_locked, GRPC_ERROR_NONE);
1790     return;
1791   }
1792   grpc_closure_list_append(&pq->lists[GRPC_CHTTP2_PCL_INITIATE],
1793                            &t->start_keepalive_ping_locked, GRPC_ERROR_NONE);
1794   grpc_closure_list_append(&pq->lists[GRPC_CHTTP2_PCL_NEXT],
1795                            &t->finish_keepalive_ping_locked, GRPC_ERROR_NONE);
1796 }
1797
1798 static void retry_initiate_ping_locked(void* tp, grpc_error* error) {
1799   grpc_chttp2_transport* t = static_cast<grpc_chttp2_transport*>(tp);
1800   t->ping_state.is_delayed_ping_timer_set = false;
1801   if (error == GRPC_ERROR_NONE) {
1802     grpc_chttp2_initiate_write(t, GRPC_CHTTP2_INITIATE_WRITE_RETRY_SEND_PING);
1803   }
1804   GRPC_CHTTP2_UNREF_TRANSPORT(t, "retry_initiate_ping_locked");
1805 }
1806
1807 void grpc_chttp2_ack_ping(grpc_chttp2_transport* t, uint64_t id) {
1808   grpc_chttp2_ping_queue* pq = &t->ping_queue;
1809   if (pq->inflight_id != id) {
1810     char* from = grpc_endpoint_get_peer(t->ep);
1811     gpr_log(GPR_DEBUG, "Unknown ping response from %s: %" PRIx64, from, id);
1812     gpr_free(from);
1813     return;
1814   }
1815   GRPC_CLOSURE_LIST_SCHED(&pq->lists[GRPC_CHTTP2_PCL_INFLIGHT]);
1816   if (!grpc_closure_list_empty(pq->lists[GRPC_CHTTP2_PCL_NEXT])) {
1817     grpc_chttp2_initiate_write(t, GRPC_CHTTP2_INITIATE_WRITE_CONTINUE_PINGS);
1818   }
1819 }
1820
1821 static void send_goaway(grpc_chttp2_transport* t, grpc_error* error) {
1822   /* We want to log this irrespective of whether http tracing is enabled */
1823   gpr_log(GPR_INFO, "%s: Sending goaway err=%s", t->peer_string,
1824           grpc_error_string(error));
1825   t->sent_goaway_state = GRPC_CHTTP2_GOAWAY_SEND_SCHEDULED;
1826   grpc_http2_error_code http_error;
1827   grpc_slice slice;
1828   grpc_error_get_status(error, GRPC_MILLIS_INF_FUTURE, nullptr, &slice,
1829                         &http_error, nullptr);
1830   grpc_chttp2_goaway_append(t->last_new_stream_id,
1831                             static_cast<uint32_t>(http_error),
1832                             grpc_slice_ref_internal(slice), &t->qbuf);
1833   grpc_chttp2_initiate_write(t, GRPC_CHTTP2_INITIATE_WRITE_GOAWAY_SENT);
1834   GRPC_ERROR_UNREF(error);
1835 }
1836
1837 void grpc_chttp2_add_ping_strike(grpc_chttp2_transport* t) {
1838   if (++t->ping_recv_state.ping_strikes > t->ping_policy.max_ping_strikes &&
1839       t->ping_policy.max_ping_strikes != 0) {
1840     send_goaway(t,
1841                 grpc_error_set_int(
1842                     GRPC_ERROR_CREATE_FROM_STATIC_STRING("too_many_pings"),
1843                     GRPC_ERROR_INT_HTTP2_ERROR, GRPC_HTTP2_ENHANCE_YOUR_CALM));
1844     /*The transport will be closed after the write is done */
1845     close_transport_locked(
1846         t, grpc_error_set_int(
1847                GRPC_ERROR_CREATE_FROM_STATIC_STRING("Too many pings"),
1848                GRPC_ERROR_INT_GRPC_STATUS, GRPC_STATUS_UNAVAILABLE));
1849   }
1850 }
1851
1852 static void perform_transport_op_locked(void* stream_op,
1853                                         grpc_error* error_ignored) {
1854   grpc_transport_op* op = static_cast<grpc_transport_op*>(stream_op);
1855   grpc_chttp2_transport* t =
1856       static_cast<grpc_chttp2_transport*>(op->handler_private.extra_arg);
1857
1858   if (op->goaway_error) {
1859     send_goaway(t, op->goaway_error);
1860   }
1861
1862   if (op->set_accept_stream) {
1863     t->channel_callback.accept_stream = op->set_accept_stream_fn;
1864     t->channel_callback.accept_stream_user_data =
1865         op->set_accept_stream_user_data;
1866   }
1867
1868   if (op->bind_pollset) {
1869     grpc_endpoint_add_to_pollset(t->ep, op->bind_pollset);
1870   }
1871
1872   if (op->bind_pollset_set) {
1873     grpc_endpoint_add_to_pollset_set(t->ep, op->bind_pollset_set);
1874   }
1875
1876   if (op->send_ping.on_initiate != nullptr || op->send_ping.on_ack != nullptr) {
1877     send_ping_locked(t, op->send_ping.on_initiate, op->send_ping.on_ack);
1878     grpc_chttp2_initiate_write(t, GRPC_CHTTP2_INITIATE_WRITE_APPLICATION_PING);
1879   }
1880
1881   if (op->on_connectivity_state_change != nullptr) {
1882     grpc_connectivity_state_notify_on_state_change(
1883         &t->channel_callback.state_tracker, op->connectivity_state,
1884         op->on_connectivity_state_change);
1885   }
1886
1887   if (op->disconnect_with_error != GRPC_ERROR_NONE) {
1888     close_transport_locked(t, op->disconnect_with_error);
1889   }
1890
1891   GRPC_CLOSURE_RUN(op->on_consumed, GRPC_ERROR_NONE);
1892
1893   GRPC_CHTTP2_UNREF_TRANSPORT(t, "transport_op");
1894 }
1895
1896 static void perform_transport_op(grpc_transport* gt, grpc_transport_op* op) {
1897   grpc_chttp2_transport* t = reinterpret_cast<grpc_chttp2_transport*>(gt);
1898   if (GRPC_TRACE_FLAG_ENABLED(grpc_http_trace)) {
1899     char* msg = grpc_transport_op_string(op);
1900     gpr_log(GPR_INFO, "perform_transport_op[t=%p]: %s", t, msg);
1901     gpr_free(msg);
1902   }
1903   op->handler_private.extra_arg = gt;
1904   GRPC_CHTTP2_REF_TRANSPORT(t, "transport_op");
1905   GRPC_CLOSURE_SCHED(GRPC_CLOSURE_INIT(&op->handler_private.closure,
1906                                        perform_transport_op_locked, op,
1907                                        grpc_combiner_scheduler(t->combiner)),
1908                      GRPC_ERROR_NONE);
1909 }
1910
1911 /*******************************************************************************
1912  * INPUT PROCESSING - GENERAL
1913  */
1914
1915 void grpc_chttp2_maybe_complete_recv_initial_metadata(grpc_chttp2_transport* t,
1916                                                       grpc_chttp2_stream* s) {
1917   if (s->recv_initial_metadata_ready != nullptr &&
1918       s->published_metadata[0] != GRPC_METADATA_NOT_PUBLISHED) {
1919     if (s->seen_error) {
1920       grpc_slice_buffer_reset_and_unref_internal(&s->frame_storage);
1921       if (!s->pending_byte_stream) {
1922         grpc_slice_buffer_reset_and_unref_internal(
1923             &s->unprocessed_incoming_frames_buffer);
1924       }
1925     }
1926     grpc_chttp2_incoming_metadata_buffer_publish(&s->metadata_buffer[0],
1927                                                  s->recv_initial_metadata);
1928     null_then_sched_closure(&s->recv_initial_metadata_ready);
1929   }
1930 }
1931
1932 void grpc_chttp2_maybe_complete_recv_message(grpc_chttp2_transport* t,
1933                                              grpc_chttp2_stream* s) {
1934   grpc_error* error = GRPC_ERROR_NONE;
1935   if (s->recv_message_ready != nullptr) {
1936     *s->recv_message = nullptr;
1937     if (s->final_metadata_requested && s->seen_error) {
1938       grpc_slice_buffer_reset_and_unref_internal(&s->frame_storage);
1939       if (!s->pending_byte_stream) {
1940         grpc_slice_buffer_reset_and_unref_internal(
1941             &s->unprocessed_incoming_frames_buffer);
1942       }
1943     }
1944     if (!s->pending_byte_stream) {
1945       while (s->unprocessed_incoming_frames_buffer.length > 0 ||
1946              s->frame_storage.length > 0) {
1947         if (s->unprocessed_incoming_frames_buffer.length == 0) {
1948           grpc_slice_buffer_swap(&s->unprocessed_incoming_frames_buffer,
1949                                  &s->frame_storage);
1950           s->unprocessed_incoming_frames_decompressed = false;
1951         }
1952         if (!s->unprocessed_incoming_frames_decompressed &&
1953             s->stream_decompression_method !=
1954                 GRPC_STREAM_COMPRESSION_IDENTITY_DECOMPRESS) {
1955           GPR_ASSERT(s->decompressed_data_buffer.length == 0);
1956           bool end_of_context;
1957           if (!s->stream_decompression_ctx) {
1958             s->stream_decompression_ctx =
1959                 grpc_stream_compression_context_create(
1960                     s->stream_decompression_method);
1961           }
1962           if (!grpc_stream_decompress(
1963                   s->stream_decompression_ctx,
1964                   &s->unprocessed_incoming_frames_buffer,
1965                   &s->decompressed_data_buffer, nullptr,
1966                   GRPC_HEADER_SIZE_IN_BYTES - s->decompressed_header_bytes,
1967                   &end_of_context)) {
1968             grpc_slice_buffer_reset_and_unref_internal(&s->frame_storage);
1969             grpc_slice_buffer_reset_and_unref_internal(
1970                 &s->unprocessed_incoming_frames_buffer);
1971             error = GRPC_ERROR_CREATE_FROM_STATIC_STRING(
1972                 "Stream decompression error.");
1973           } else {
1974             s->decompressed_header_bytes += s->decompressed_data_buffer.length;
1975             if (s->decompressed_header_bytes == GRPC_HEADER_SIZE_IN_BYTES) {
1976               s->decompressed_header_bytes = 0;
1977             }
1978             error = grpc_deframe_unprocessed_incoming_frames(
1979                 &s->data_parser, s, &s->decompressed_data_buffer, nullptr,
1980                 s->recv_message);
1981             if (end_of_context) {
1982               grpc_stream_compression_context_destroy(
1983                   s->stream_decompression_ctx);
1984               s->stream_decompression_ctx = nullptr;
1985             }
1986           }
1987         } else {
1988           error = grpc_deframe_unprocessed_incoming_frames(
1989               &s->data_parser, s, &s->unprocessed_incoming_frames_buffer,
1990               nullptr, s->recv_message);
1991         }
1992         if (error != GRPC_ERROR_NONE) {
1993           s->seen_error = true;
1994           grpc_slice_buffer_reset_and_unref_internal(&s->frame_storage);
1995           grpc_slice_buffer_reset_and_unref_internal(
1996               &s->unprocessed_incoming_frames_buffer);
1997           break;
1998         } else if (*s->recv_message != nullptr) {
1999           break;
2000         }
2001       }
2002     }
2003     // save the length of the buffer before handing control back to application
2004     // threads. Needed to support correct flow control bookkeeping
2005     s->unprocessed_incoming_frames_buffer_cached_length =
2006         s->unprocessed_incoming_frames_buffer.length;
2007     if (error == GRPC_ERROR_NONE && *s->recv_message != nullptr) {
2008       null_then_sched_closure(&s->recv_message_ready);
2009     } else if (s->published_metadata[1] != GRPC_METADATA_NOT_PUBLISHED) {
2010       *s->recv_message = nullptr;
2011       null_then_sched_closure(&s->recv_message_ready);
2012     }
2013     GRPC_ERROR_UNREF(error);
2014   }
2015 }
2016
2017 void grpc_chttp2_maybe_complete_recv_trailing_metadata(grpc_chttp2_transport* t,
2018                                                        grpc_chttp2_stream* s) {
2019   grpc_chttp2_maybe_complete_recv_message(t, s);
2020   if (s->recv_trailing_metadata_finished != nullptr && s->read_closed &&
2021       s->write_closed) {
2022     if (s->seen_error || !t->is_client) {
2023       grpc_slice_buffer_reset_and_unref_internal(&s->frame_storage);
2024       if (!s->pending_byte_stream) {
2025         grpc_slice_buffer_reset_and_unref_internal(
2026             &s->unprocessed_incoming_frames_buffer);
2027       }
2028     }
2029     bool pending_data = s->pending_byte_stream ||
2030                         s->unprocessed_incoming_frames_buffer.length > 0;
2031     if (s->read_closed && s->frame_storage.length > 0 && !pending_data &&
2032         !s->seen_error && s->recv_trailing_metadata_finished != nullptr) {
2033       /* Maybe some SYNC_FLUSH data is left in frame_storage. Consume them and
2034        * maybe decompress the next 5 bytes in the stream. */
2035       if (s->stream_decompression_method ==
2036           GRPC_STREAM_COMPRESSION_IDENTITY_DECOMPRESS) {
2037         grpc_slice_buffer_move_first(
2038             &s->frame_storage,
2039             GPR_MIN(s->frame_storage.length, GRPC_HEADER_SIZE_IN_BYTES),
2040             &s->unprocessed_incoming_frames_buffer);
2041         if (s->unprocessed_incoming_frames_buffer.length > 0) {
2042           s->unprocessed_incoming_frames_decompressed = true;
2043           pending_data = true;
2044         }
2045       } else {
2046         bool end_of_context;
2047         if (!s->stream_decompression_ctx) {
2048           s->stream_decompression_ctx = grpc_stream_compression_context_create(
2049               s->stream_decompression_method);
2050         }
2051         if (!grpc_stream_decompress(
2052                 s->stream_decompression_ctx, &s->frame_storage,
2053                 &s->unprocessed_incoming_frames_buffer, nullptr,
2054                 GRPC_HEADER_SIZE_IN_BYTES, &end_of_context)) {
2055           grpc_slice_buffer_reset_and_unref_internal(&s->frame_storage);
2056           grpc_slice_buffer_reset_and_unref_internal(
2057               &s->unprocessed_incoming_frames_buffer);
2058           s->seen_error = true;
2059         } else {
2060           if (s->unprocessed_incoming_frames_buffer.length > 0) {
2061             s->unprocessed_incoming_frames_decompressed = true;
2062             pending_data = true;
2063           }
2064           if (end_of_context) {
2065             grpc_stream_compression_context_destroy(
2066                 s->stream_decompression_ctx);
2067             s->stream_decompression_ctx = nullptr;
2068           }
2069         }
2070       }
2071     }
2072     if (s->read_closed && s->frame_storage.length == 0 && !pending_data &&
2073         s->recv_trailing_metadata_finished != nullptr) {
2074       grpc_transport_move_stats(&s->stats, s->collecting_stats);
2075       s->collecting_stats = nullptr;
2076       grpc_chttp2_incoming_metadata_buffer_publish(&s->metadata_buffer[1],
2077                                                    s->recv_trailing_metadata);
2078       null_then_sched_closure(&s->recv_trailing_metadata_finished);
2079     }
2080   }
2081 }
2082
2083 static void remove_stream(grpc_chttp2_transport* t, uint32_t id,
2084                           grpc_error* error) {
2085   grpc_chttp2_stream* s = static_cast<grpc_chttp2_stream*>(
2086       grpc_chttp2_stream_map_delete(&t->stream_map, id));
2087   GPR_DEBUG_ASSERT(s);
2088   if (t->incoming_stream == s) {
2089     t->incoming_stream = nullptr;
2090     grpc_chttp2_parsing_become_skip_parser(t);
2091   }
2092   if (s->pending_byte_stream) {
2093     if (s->on_next != nullptr) {
2094       grpc_core::Chttp2IncomingByteStream* bs = s->data_parser.parsing_frame;
2095       if (error == GRPC_ERROR_NONE) {
2096         error = GRPC_ERROR_CREATE_FROM_STATIC_STRING("Truncated message");
2097       }
2098       bs->PublishError(error);
2099       bs->Unref();
2100       s->data_parser.parsing_frame = nullptr;
2101     } else {
2102       GRPC_ERROR_UNREF(s->byte_stream_error);
2103       s->byte_stream_error = GRPC_ERROR_REF(error);
2104     }
2105   }
2106
2107   if (grpc_chttp2_stream_map_size(&t->stream_map) == 0) {
2108     post_benign_reclaimer(t);
2109     if (t->sent_goaway_state == GRPC_CHTTP2_GOAWAY_SENT) {
2110       close_transport_locked(
2111           t, GRPC_ERROR_CREATE_REFERENCING_FROM_STATIC_STRING(
2112                  "Last stream closed after sending GOAWAY", &error, 1));
2113     }
2114   }
2115   if (grpc_chttp2_list_remove_writable_stream(t, s)) {
2116     GRPC_CHTTP2_STREAM_UNREF(s, "chttp2_writing:remove_stream");
2117   }
2118
2119   GRPC_ERROR_UNREF(error);
2120
2121   maybe_start_some_streams(t);
2122 }
2123
2124 void grpc_chttp2_cancel_stream(grpc_chttp2_transport* t, grpc_chttp2_stream* s,
2125                                grpc_error* due_to_error) {
2126   if (!t->is_client && !s->sent_trailing_metadata &&
2127       grpc_error_has_clear_grpc_status(due_to_error)) {
2128     close_from_api(t, s, due_to_error);
2129     return;
2130   }
2131
2132   if (!s->read_closed || !s->write_closed) {
2133     if (s->id != 0) {
2134       grpc_http2_error_code http_error;
2135       grpc_error_get_status(due_to_error, s->deadline, nullptr, nullptr,
2136                             &http_error, nullptr);
2137       grpc_chttp2_add_rst_stream_to_next_write(
2138           t, s->id, static_cast<uint32_t>(http_error), &s->stats.outgoing);
2139       grpc_chttp2_initiate_write(t, GRPC_CHTTP2_INITIATE_WRITE_RST_STREAM);
2140     }
2141   }
2142   if (due_to_error != GRPC_ERROR_NONE && !s->seen_error) {
2143     s->seen_error = true;
2144   }
2145   grpc_chttp2_mark_stream_closed(t, s, 1, 1, due_to_error);
2146 }
2147
2148 void grpc_chttp2_fake_status(grpc_chttp2_transport* t, grpc_chttp2_stream* s,
2149                              grpc_error* error) {
2150   grpc_status_code status;
2151   grpc_slice slice;
2152   grpc_error_get_status(error, s->deadline, &status, &slice, nullptr, nullptr);
2153   if (status != GRPC_STATUS_OK) {
2154     s->seen_error = true;
2155   }
2156   /* stream_global->recv_trailing_metadata_finished gives us a
2157      last chance replacement: we've received trailing metadata,
2158      but something more important has become available to signal
2159      to the upper layers - drop what we've got, and then publish
2160      what we want - which is safe because we haven't told anyone
2161      about the metadata yet */
2162   if (s->published_metadata[1] == GRPC_METADATA_NOT_PUBLISHED ||
2163       s->recv_trailing_metadata_finished != nullptr) {
2164     char status_string[GPR_LTOA_MIN_BUFSIZE];
2165     gpr_ltoa(status, status_string);
2166     GRPC_LOG_IF_ERROR("add_status",
2167                       grpc_chttp2_incoming_metadata_buffer_replace_or_add(
2168                           &s->metadata_buffer[1],
2169                           grpc_mdelem_from_slices(
2170                               GRPC_MDSTR_GRPC_STATUS,
2171                               grpc_core::UnmanagedMemorySlice(status_string))));
2172     if (!GRPC_SLICE_IS_EMPTY(slice)) {
2173       GRPC_LOG_IF_ERROR(
2174           "add_status_message",
2175           grpc_chttp2_incoming_metadata_buffer_replace_or_add(
2176               &s->metadata_buffer[1],
2177               grpc_mdelem_create(GRPC_MDSTR_GRPC_MESSAGE, slice, nullptr)));
2178     }
2179     s->published_metadata[1] = GRPC_METADATA_SYNTHESIZED_FROM_FAKE;
2180     grpc_chttp2_maybe_complete_recv_trailing_metadata(t, s);
2181   }
2182
2183   GRPC_ERROR_UNREF(error);
2184 }
2185
2186 static void add_error(grpc_error* error, grpc_error** refs, size_t* nrefs) {
2187   if (error == GRPC_ERROR_NONE) return;
2188   for (size_t i = 0; i < *nrefs; i++) {
2189     if (error == refs[i]) {
2190       return;
2191     }
2192   }
2193   refs[*nrefs] = error;
2194   ++*nrefs;
2195 }
2196
2197 static grpc_error* removal_error(grpc_error* extra_error, grpc_chttp2_stream* s,
2198                                  const char* master_error_msg) {
2199   grpc_error* refs[3];
2200   size_t nrefs = 0;
2201   add_error(s->read_closed_error, refs, &nrefs);
2202   add_error(s->write_closed_error, refs, &nrefs);
2203   add_error(extra_error, refs, &nrefs);
2204   grpc_error* error = GRPC_ERROR_NONE;
2205   if (nrefs > 0) {
2206     error = GRPC_ERROR_CREATE_REFERENCING_FROM_STATIC_STRING(master_error_msg,
2207                                                              refs, nrefs);
2208   }
2209   GRPC_ERROR_UNREF(extra_error);
2210   return error;
2211 }
2212
2213 static void flush_write_list(grpc_chttp2_transport* t, grpc_chttp2_stream* s,
2214                              grpc_chttp2_write_cb** list, grpc_error* error) {
2215   while (*list) {
2216     grpc_chttp2_write_cb* cb = *list;
2217     *list = cb->next;
2218     grpc_chttp2_complete_closure_step(t, s, &cb->closure, GRPC_ERROR_REF(error),
2219                                       "on_write_finished_cb");
2220     cb->next = t->write_cb_pool;
2221     t->write_cb_pool = cb;
2222   }
2223   GRPC_ERROR_UNREF(error);
2224 }
2225
2226 void grpc_chttp2_fail_pending_writes(grpc_chttp2_transport* t,
2227                                      grpc_chttp2_stream* s, grpc_error* error) {
2228   error =
2229       removal_error(error, s, "Pending writes failed due to stream closure");
2230   s->send_initial_metadata = nullptr;
2231   grpc_chttp2_complete_closure_step(t, s, &s->send_initial_metadata_finished,
2232                                     GRPC_ERROR_REF(error),
2233                                     "send_initial_metadata_finished");
2234
2235   s->send_trailing_metadata = nullptr;
2236   grpc_chttp2_complete_closure_step(t, s, &s->send_trailing_metadata_finished,
2237                                     GRPC_ERROR_REF(error),
2238                                     "send_trailing_metadata_finished");
2239
2240   s->fetching_send_message.reset();
2241   grpc_chttp2_complete_closure_step(t, s, &s->fetching_send_message_finished,
2242                                     GRPC_ERROR_REF(error),
2243                                     "fetching_send_message_finished");
2244   flush_write_list(t, s, &s->on_write_finished_cbs, GRPC_ERROR_REF(error));
2245   flush_write_list(t, s, &s->on_flow_controlled_cbs, error);
2246 }
2247
2248 void grpc_chttp2_mark_stream_closed(grpc_chttp2_transport* t,
2249                                     grpc_chttp2_stream* s, int close_reads,
2250                                     int close_writes, grpc_error* error) {
2251   if (s->read_closed && s->write_closed) {
2252     /* already closed */
2253     grpc_chttp2_maybe_complete_recv_trailing_metadata(t, s);
2254     GRPC_ERROR_UNREF(error);
2255     return;
2256   }
2257   bool closed_read = false;
2258   bool became_closed = false;
2259   if (close_reads && !s->read_closed) {
2260     s->read_closed_error = GRPC_ERROR_REF(error);
2261     s->read_closed = true;
2262     closed_read = true;
2263   }
2264   if (close_writes && !s->write_closed) {
2265     s->write_closed_error = GRPC_ERROR_REF(error);
2266     s->write_closed = true;
2267     grpc_chttp2_fail_pending_writes(t, s, GRPC_ERROR_REF(error));
2268   }
2269   if (s->read_closed && s->write_closed) {
2270     became_closed = true;
2271     grpc_error* overall_error =
2272         removal_error(GRPC_ERROR_REF(error), s, "Stream removed");
2273     if (s->id != 0) {
2274       remove_stream(t, s->id, GRPC_ERROR_REF(overall_error));
2275     } else {
2276       /* Purge streams waiting on concurrency still waiting for id assignment */
2277       grpc_chttp2_list_remove_waiting_for_concurrency(t, s);
2278     }
2279     if (overall_error != GRPC_ERROR_NONE) {
2280       grpc_chttp2_fake_status(t, s, overall_error);
2281     }
2282   }
2283   if (closed_read) {
2284     for (int i = 0; i < 2; i++) {
2285       if (s->published_metadata[i] == GRPC_METADATA_NOT_PUBLISHED) {
2286         s->published_metadata[i] = GRPC_METADATA_PUBLISHED_AT_CLOSE;
2287       }
2288     }
2289     grpc_chttp2_maybe_complete_recv_initial_metadata(t, s);
2290     grpc_chttp2_maybe_complete_recv_message(t, s);
2291   }
2292   if (became_closed) {
2293     grpc_chttp2_maybe_complete_recv_trailing_metadata(t, s);
2294     GRPC_CHTTP2_STREAM_UNREF(s, "chttp2");
2295   }
2296   GRPC_ERROR_UNREF(error);
2297 }
2298
2299 static void close_from_api(grpc_chttp2_transport* t, grpc_chttp2_stream* s,
2300                            grpc_error* error) {
2301   grpc_slice hdr;
2302   grpc_slice status_hdr;
2303   grpc_slice http_status_hdr;
2304   grpc_slice content_type_hdr;
2305   grpc_slice message_pfx;
2306   uint8_t* p;
2307   uint32_t len = 0;
2308   grpc_status_code grpc_status;
2309   grpc_slice slice;
2310   grpc_error_get_status(error, s->deadline, &grpc_status, &slice, nullptr,
2311                         nullptr);
2312
2313   GPR_ASSERT(grpc_status >= 0 && (int)grpc_status < 100);
2314
2315   /* Hand roll a header block.
2316      This is unnecessarily ugly - at some point we should find a more
2317      elegant solution.
2318      It's complicated by the fact that our send machinery would be dead by
2319      the time we got around to sending this, so instead we ignore HPACK
2320      compression and just write the uncompressed bytes onto the wire. */
2321   if (!s->sent_initial_metadata) {
2322     http_status_hdr = GRPC_SLICE_MALLOC(13);
2323     p = GRPC_SLICE_START_PTR(http_status_hdr);
2324     *p++ = 0x00;
2325     *p++ = 7;
2326     *p++ = ':';
2327     *p++ = 's';
2328     *p++ = 't';
2329     *p++ = 'a';
2330     *p++ = 't';
2331     *p++ = 'u';
2332     *p++ = 's';
2333     *p++ = 3;
2334     *p++ = '2';
2335     *p++ = '0';
2336     *p++ = '0';
2337     GPR_ASSERT(p == GRPC_SLICE_END_PTR(http_status_hdr));
2338     len += static_cast<uint32_t> GRPC_SLICE_LENGTH(http_status_hdr);
2339
2340     content_type_hdr = GRPC_SLICE_MALLOC(31);
2341     p = GRPC_SLICE_START_PTR(content_type_hdr);
2342     *p++ = 0x00;
2343     *p++ = 12;
2344     *p++ = 'c';
2345     *p++ = 'o';
2346     *p++ = 'n';
2347     *p++ = 't';
2348     *p++ = 'e';
2349     *p++ = 'n';
2350     *p++ = 't';
2351     *p++ = '-';
2352     *p++ = 't';
2353     *p++ = 'y';
2354     *p++ = 'p';
2355     *p++ = 'e';
2356     *p++ = 16;
2357     *p++ = 'a';
2358     *p++ = 'p';
2359     *p++ = 'p';
2360     *p++ = 'l';
2361     *p++ = 'i';
2362     *p++ = 'c';
2363     *p++ = 'a';
2364     *p++ = 't';
2365     *p++ = 'i';
2366     *p++ = 'o';
2367     *p++ = 'n';
2368     *p++ = '/';
2369     *p++ = 'g';
2370     *p++ = 'r';
2371     *p++ = 'p';
2372     *p++ = 'c';
2373     GPR_ASSERT(p == GRPC_SLICE_END_PTR(content_type_hdr));
2374     len += static_cast<uint32_t> GRPC_SLICE_LENGTH(content_type_hdr);
2375   }
2376
2377   status_hdr = GRPC_SLICE_MALLOC(15 + (grpc_status >= 10));
2378   p = GRPC_SLICE_START_PTR(status_hdr);
2379   *p++ = 0x00; /* literal header, not indexed */
2380   *p++ = 11;   /* len(grpc-status) */
2381   *p++ = 'g';
2382   *p++ = 'r';
2383   *p++ = 'p';
2384   *p++ = 'c';
2385   *p++ = '-';
2386   *p++ = 's';
2387   *p++ = 't';
2388   *p++ = 'a';
2389   *p++ = 't';
2390   *p++ = 'u';
2391   *p++ = 's';
2392   if (grpc_status < 10) {
2393     *p++ = 1;
2394     *p++ = static_cast<uint8_t>('0' + grpc_status);
2395   } else {
2396     *p++ = 2;
2397     *p++ = static_cast<uint8_t>('0' + (grpc_status / 10));
2398     *p++ = static_cast<uint8_t>('0' + (grpc_status % 10));
2399   }
2400   GPR_ASSERT(p == GRPC_SLICE_END_PTR(status_hdr));
2401   len += static_cast<uint32_t> GRPC_SLICE_LENGTH(status_hdr);
2402
2403   size_t msg_len = GRPC_SLICE_LENGTH(slice);
2404   GPR_ASSERT(msg_len <= UINT32_MAX);
2405   uint32_t msg_len_len = GRPC_CHTTP2_VARINT_LENGTH((uint32_t)msg_len, 1);
2406   message_pfx = GRPC_SLICE_MALLOC(14 + msg_len_len);
2407   p = GRPC_SLICE_START_PTR(message_pfx);
2408   *p++ = 0x00; /* literal header, not indexed */
2409   *p++ = 12;   /* len(grpc-message) */
2410   *p++ = 'g';
2411   *p++ = 'r';
2412   *p++ = 'p';
2413   *p++ = 'c';
2414   *p++ = '-';
2415   *p++ = 'm';
2416   *p++ = 'e';
2417   *p++ = 's';
2418   *p++ = 's';
2419   *p++ = 'a';
2420   *p++ = 'g';
2421   *p++ = 'e';
2422   GRPC_CHTTP2_WRITE_VARINT((uint32_t)msg_len, 1, 0, p, (uint32_t)msg_len_len);
2423   p += msg_len_len;
2424   GPR_ASSERT(p == GRPC_SLICE_END_PTR(message_pfx));
2425   len += static_cast<uint32_t> GRPC_SLICE_LENGTH(message_pfx);
2426   len += static_cast<uint32_t>(msg_len);
2427
2428   hdr = GRPC_SLICE_MALLOC(9);
2429   p = GRPC_SLICE_START_PTR(hdr);
2430   *p++ = static_cast<uint8_t>(len >> 16);
2431   *p++ = static_cast<uint8_t>(len >> 8);
2432   *p++ = static_cast<uint8_t>(len);
2433   *p++ = GRPC_CHTTP2_FRAME_HEADER;
2434   *p++ = GRPC_CHTTP2_DATA_FLAG_END_STREAM | GRPC_CHTTP2_DATA_FLAG_END_HEADERS;
2435   *p++ = static_cast<uint8_t>(s->id >> 24);
2436   *p++ = static_cast<uint8_t>(s->id >> 16);
2437   *p++ = static_cast<uint8_t>(s->id >> 8);
2438   *p++ = static_cast<uint8_t>(s->id);
2439   GPR_ASSERT(p == GRPC_SLICE_END_PTR(hdr));
2440
2441   grpc_slice_buffer_add(&t->qbuf, hdr);
2442   if (!s->sent_initial_metadata) {
2443     grpc_slice_buffer_add(&t->qbuf, http_status_hdr);
2444     grpc_slice_buffer_add(&t->qbuf, content_type_hdr);
2445   }
2446   grpc_slice_buffer_add(&t->qbuf, status_hdr);
2447   grpc_slice_buffer_add(&t->qbuf, message_pfx);
2448   grpc_slice_buffer_add(&t->qbuf, grpc_slice_ref_internal(slice));
2449   grpc_chttp2_add_rst_stream_to_next_write(t, s->id, GRPC_HTTP2_NO_ERROR,
2450                                            &s->stats.outgoing);
2451
2452   grpc_chttp2_mark_stream_closed(t, s, 1, 1, error);
2453   grpc_chttp2_initiate_write(t, GRPC_CHTTP2_INITIATE_WRITE_CLOSE_FROM_API);
2454 }
2455
2456 typedef struct {
2457   grpc_error* error;
2458   grpc_chttp2_transport* t;
2459 } cancel_stream_cb_args;
2460
2461 static void cancel_stream_cb(void* user_data, uint32_t key, void* stream) {
2462   cancel_stream_cb_args* args = static_cast<cancel_stream_cb_args*>(user_data);
2463   grpc_chttp2_stream* s = static_cast<grpc_chttp2_stream*>(stream);
2464   grpc_chttp2_cancel_stream(args->t, s, GRPC_ERROR_REF(args->error));
2465 }
2466
2467 static void end_all_the_calls(grpc_chttp2_transport* t, grpc_error* error) {
2468   intptr_t http2_error;
2469   // If there is no explicit grpc or HTTP/2 error, set to UNAVAILABLE on server.
2470   if (!t->is_client && !grpc_error_has_clear_grpc_status(error) &&
2471       !grpc_error_get_int(error, GRPC_ERROR_INT_HTTP2_ERROR, &http2_error)) {
2472     error = grpc_error_set_int(error, GRPC_ERROR_INT_GRPC_STATUS,
2473                                GRPC_STATUS_UNAVAILABLE);
2474   }
2475   cancel_stream_cb_args args = {error, t};
2476   grpc_chttp2_stream_map_for_each(&t->stream_map, cancel_stream_cb, &args);
2477   GRPC_ERROR_UNREF(error);
2478 }
2479
2480 /*******************************************************************************
2481  * INPUT PROCESSING - PARSING
2482  */
2483
2484 template <class F>
2485 static void WithUrgency(grpc_chttp2_transport* t,
2486                         grpc_core::chttp2::FlowControlAction::Urgency urgency,
2487                         grpc_chttp2_initiate_write_reason reason, F action) {
2488   switch (urgency) {
2489     case grpc_core::chttp2::FlowControlAction::Urgency::NO_ACTION_NEEDED:
2490       break;
2491     case grpc_core::chttp2::FlowControlAction::Urgency::UPDATE_IMMEDIATELY:
2492       grpc_chttp2_initiate_write(t, reason);
2493     // fallthrough
2494     case grpc_core::chttp2::FlowControlAction::Urgency::QUEUE_UPDATE:
2495       action();
2496       break;
2497   }
2498 }
2499
2500 void grpc_chttp2_act_on_flowctl_action(
2501     const grpc_core::chttp2::FlowControlAction& action,
2502     grpc_chttp2_transport* t, grpc_chttp2_stream* s) {
2503   WithUrgency(t, action.send_stream_update(),
2504               GRPC_CHTTP2_INITIATE_WRITE_STREAM_FLOW_CONTROL,
2505               [t, s]() { grpc_chttp2_mark_stream_writable(t, s); });
2506   WithUrgency(t, action.send_transport_update(),
2507               GRPC_CHTTP2_INITIATE_WRITE_TRANSPORT_FLOW_CONTROL, []() {});
2508   WithUrgency(t, action.send_initial_window_update(),
2509               GRPC_CHTTP2_INITIATE_WRITE_SEND_SETTINGS, [t, &action]() {
2510                 queue_setting_update(t,
2511                                      GRPC_CHTTP2_SETTINGS_INITIAL_WINDOW_SIZE,
2512                                      action.initial_window_size());
2513               });
2514   WithUrgency(t, action.send_max_frame_size_update(),
2515               GRPC_CHTTP2_INITIATE_WRITE_SEND_SETTINGS, [t, &action]() {
2516                 queue_setting_update(t, GRPC_CHTTP2_SETTINGS_MAX_FRAME_SIZE,
2517                                      action.max_frame_size());
2518               });
2519 }
2520
2521 static grpc_error* try_http_parsing(grpc_chttp2_transport* t) {
2522   grpc_http_parser parser;
2523   size_t i = 0;
2524   grpc_error* error = GRPC_ERROR_NONE;
2525   grpc_http_response response;
2526
2527   grpc_http_parser_init(&parser, GRPC_HTTP_RESPONSE, &response);
2528
2529   grpc_error* parse_error = GRPC_ERROR_NONE;
2530   for (; i < t->read_buffer.count && parse_error == GRPC_ERROR_NONE; i++) {
2531     parse_error =
2532         grpc_http_parser_parse(&parser, t->read_buffer.slices[i], nullptr);
2533   }
2534   if (parse_error == GRPC_ERROR_NONE &&
2535       (parse_error = grpc_http_parser_eof(&parser)) == GRPC_ERROR_NONE) {
2536     error = grpc_error_set_int(
2537         grpc_error_set_int(GRPC_ERROR_CREATE_FROM_STATIC_STRING(
2538                                "Trying to connect an http1.x server"),
2539                            GRPC_ERROR_INT_HTTP_STATUS, response.status),
2540         GRPC_ERROR_INT_GRPC_STATUS, GRPC_STATUS_UNAVAILABLE);
2541   }
2542   GRPC_ERROR_UNREF(parse_error);
2543
2544   grpc_http_parser_destroy(&parser);
2545   grpc_http_response_destroy(&response);
2546   return error;
2547 }
2548
2549 static void read_action_locked(void* tp, grpc_error* error) {
2550   GPR_TIMER_SCOPE("reading_action_locked", 0);
2551
2552   grpc_chttp2_transport* t = static_cast<grpc_chttp2_transport*>(tp);
2553
2554   GRPC_ERROR_REF(error);
2555
2556   grpc_error* err = error;
2557   if (err != GRPC_ERROR_NONE) {
2558     err = grpc_error_set_int(GRPC_ERROR_CREATE_REFERENCING_FROM_STATIC_STRING(
2559                                  "Endpoint read failed", &err, 1),
2560                              GRPC_ERROR_INT_OCCURRED_DURING_WRITE,
2561                              t->write_state);
2562   }
2563   GPR_SWAP(grpc_error*, err, error);
2564   GRPC_ERROR_UNREF(err);
2565   if (t->closed_with_error == GRPC_ERROR_NONE) {
2566     GPR_TIMER_SCOPE("reading_action.parse", 0);
2567     size_t i = 0;
2568     grpc_error* errors[3] = {GRPC_ERROR_REF(error), GRPC_ERROR_NONE,
2569                              GRPC_ERROR_NONE};
2570     for (; i < t->read_buffer.count && errors[1] == GRPC_ERROR_NONE; i++) {
2571       grpc_core::BdpEstimator* bdp_est = t->flow_control->bdp_estimator();
2572       if (bdp_est) {
2573         bdp_est->AddIncomingBytes(
2574             static_cast<int64_t> GRPC_SLICE_LENGTH(t->read_buffer.slices[i]));
2575       }
2576       errors[1] = grpc_chttp2_perform_read(t, t->read_buffer.slices[i]);
2577     }
2578     if (errors[1] != GRPC_ERROR_NONE) {
2579       errors[2] = try_http_parsing(t);
2580       GRPC_ERROR_UNREF(error);
2581       error = GRPC_ERROR_CREATE_REFERENCING_FROM_STATIC_STRING(
2582           "Failed parsing HTTP/2", errors, GPR_ARRAY_SIZE(errors));
2583     }
2584     for (i = 0; i < GPR_ARRAY_SIZE(errors); i++) {
2585       GRPC_ERROR_UNREF(errors[i]);
2586     }
2587
2588     GPR_TIMER_SCOPE("post_parse_locked", 0);
2589     if (t->initial_window_update != 0) {
2590       if (t->initial_window_update > 0) {
2591         grpc_chttp2_stream* s;
2592         while (grpc_chttp2_list_pop_stalled_by_stream(t, &s)) {
2593           grpc_chttp2_mark_stream_writable(t, s);
2594           grpc_chttp2_initiate_write(
2595               t, GRPC_CHTTP2_INITIATE_WRITE_FLOW_CONTROL_UNSTALLED_BY_SETTING);
2596         }
2597       }
2598       t->initial_window_update = 0;
2599     }
2600   }
2601
2602   GPR_TIMER_SCOPE("post_reading_action_locked", 0);
2603   bool keep_reading = false;
2604   if (error == GRPC_ERROR_NONE && t->closed_with_error != GRPC_ERROR_NONE) {
2605     error = GRPC_ERROR_CREATE_REFERENCING_FROM_STATIC_STRING(
2606         "Transport closed", &t->closed_with_error, 1);
2607   }
2608   if (error != GRPC_ERROR_NONE) {
2609     /* If a goaway frame was received, this might be the reason why the read
2610      * failed. Add this info to the error */
2611     if (t->goaway_error != GRPC_ERROR_NONE) {
2612       error = grpc_error_add_child(error, GRPC_ERROR_REF(t->goaway_error));
2613     }
2614
2615     close_transport_locked(t, GRPC_ERROR_REF(error));
2616     t->endpoint_reading = 0;
2617   } else if (t->closed_with_error == GRPC_ERROR_NONE) {
2618     keep_reading = true;
2619     /* Since we have read a byte, reset the keepalive timer */
2620     if (t->keepalive_state == GRPC_CHTTP2_KEEPALIVE_STATE_WAITING) {
2621       grpc_timer_cancel(&t->keepalive_ping_timer);
2622     }
2623   }
2624   grpc_slice_buffer_reset_and_unref_internal(&t->read_buffer);
2625
2626   if (keep_reading) {
2627     if (t->num_pending_induced_frames >= DEFAULT_MAX_PENDING_INDUCED_FRAMES) {
2628       t->reading_paused_on_pending_induced_frames = true;
2629       GRPC_CHTTP2_IF_TRACING(
2630           gpr_log(GPR_INFO,
2631                   "transport %p : Pausing reading due to too "
2632                   "many unwritten SETTINGS ACK and RST_STREAM frames",
2633                   t));
2634     } else {
2635       continue_read_action_locked(t);
2636     }
2637   } else {
2638     GRPC_CHTTP2_UNREF_TRANSPORT(t, "reading_action");
2639   }
2640
2641   GRPC_ERROR_UNREF(error);
2642 }
2643
2644 static void continue_read_action_locked(grpc_chttp2_transport* t) {
2645   const bool urgent = t->goaway_error != GRPC_ERROR_NONE;
2646   grpc_endpoint_read(t->ep, &t->read_buffer, &t->read_action_locked, urgent);
2647   grpc_chttp2_act_on_flowctl_action(t->flow_control->MakeAction(), t, nullptr);
2648 }
2649
2650 // t is reffed prior to calling the first time, and once the callback chain
2651 // that kicks off finishes, it's unreffed
2652 static void schedule_bdp_ping_locked(grpc_chttp2_transport* t) {
2653   t->flow_control->bdp_estimator()->SchedulePing();
2654   send_ping_locked(t, &t->start_bdp_ping_locked, &t->finish_bdp_ping_locked);
2655 }
2656
2657 static void start_bdp_ping_locked(void* tp, grpc_error* error) {
2658   grpc_chttp2_transport* t = static_cast<grpc_chttp2_transport*>(tp);
2659   if (GRPC_TRACE_FLAG_ENABLED(grpc_http_trace)) {
2660     gpr_log(GPR_INFO, "%s: Start BDP ping err=%s", t->peer_string,
2661             grpc_error_string(error));
2662   }
2663   if (error != GRPC_ERROR_NONE || t->closed_with_error != GRPC_ERROR_NONE) {
2664     return;
2665   }
2666   /* Reset the keepalive ping timer */
2667   if (t->keepalive_state == GRPC_CHTTP2_KEEPALIVE_STATE_WAITING) {
2668     grpc_timer_cancel(&t->keepalive_ping_timer);
2669   }
2670   t->flow_control->bdp_estimator()->StartPing();
2671 }
2672
2673 static void finish_bdp_ping_locked(void* tp, grpc_error* error) {
2674   grpc_chttp2_transport* t = static_cast<grpc_chttp2_transport*>(tp);
2675   if (GRPC_TRACE_FLAG_ENABLED(grpc_http_trace)) {
2676     gpr_log(GPR_INFO, "%s: Complete BDP ping err=%s", t->peer_string,
2677             grpc_error_string(error));
2678   }
2679   if (error != GRPC_ERROR_NONE || t->closed_with_error != GRPC_ERROR_NONE) {
2680     GRPC_CHTTP2_UNREF_TRANSPORT(t, "bdp_ping");
2681     return;
2682   }
2683   grpc_millis next_ping = t->flow_control->bdp_estimator()->CompletePing();
2684   grpc_chttp2_act_on_flowctl_action(t->flow_control->PeriodicUpdate(), t,
2685                                     nullptr);
2686   GPR_ASSERT(!t->have_next_bdp_ping_timer);
2687   t->have_next_bdp_ping_timer = true;
2688   grpc_timer_init(&t->next_bdp_ping_timer, next_ping,
2689                   &t->next_bdp_ping_timer_expired_locked);
2690 }
2691
2692 static void next_bdp_ping_timer_expired_locked(void* tp, grpc_error* error) {
2693   grpc_chttp2_transport* t = static_cast<grpc_chttp2_transport*>(tp);
2694   GPR_ASSERT(t->have_next_bdp_ping_timer);
2695   t->have_next_bdp_ping_timer = false;
2696   if (error != GRPC_ERROR_NONE) {
2697     GRPC_CHTTP2_UNREF_TRANSPORT(t, "bdp_ping");
2698     return;
2699   }
2700   schedule_bdp_ping_locked(t);
2701 }
2702
2703 void grpc_chttp2_config_default_keepalive_args(grpc_channel_args* args,
2704                                                bool is_client) {
2705   size_t i;
2706   if (args) {
2707     for (i = 0; i < args->num_args; i++) {
2708       if (0 == strcmp(args->args[i].key, GRPC_ARG_KEEPALIVE_TIME_MS)) {
2709         const int value = grpc_channel_arg_get_integer(
2710             &args->args[i], {is_client ? g_default_client_keepalive_time_ms
2711                                        : g_default_server_keepalive_time_ms,
2712                              1, INT_MAX});
2713         if (is_client) {
2714           g_default_client_keepalive_time_ms = value;
2715         } else {
2716           g_default_server_keepalive_time_ms = value;
2717         }
2718       } else if (0 ==
2719                  strcmp(args->args[i].key, GRPC_ARG_KEEPALIVE_TIMEOUT_MS)) {
2720         const int value = grpc_channel_arg_get_integer(
2721             &args->args[i], {is_client ? g_default_client_keepalive_timeout_ms
2722                                        : g_default_server_keepalive_timeout_ms,
2723                              0, INT_MAX});
2724         if (is_client) {
2725           g_default_client_keepalive_timeout_ms = value;
2726         } else {
2727           g_default_server_keepalive_timeout_ms = value;
2728         }
2729       } else if (0 == strcmp(args->args[i].key,
2730                              GRPC_ARG_KEEPALIVE_PERMIT_WITHOUT_CALLS)) {
2731         const bool value = static_cast<uint32_t>(grpc_channel_arg_get_integer(
2732             &args->args[i],
2733             {is_client ? g_default_client_keepalive_permit_without_calls
2734                        : g_default_server_keepalive_timeout_ms,
2735              0, 1}));
2736         if (is_client) {
2737           g_default_client_keepalive_permit_without_calls = value;
2738         } else {
2739           g_default_server_keepalive_permit_without_calls = value;
2740         }
2741       } else if (0 ==
2742                  strcmp(args->args[i].key, GRPC_ARG_HTTP2_MAX_PING_STRIKES)) {
2743         g_default_max_ping_strikes = grpc_channel_arg_get_integer(
2744             &args->args[i], {g_default_max_ping_strikes, 0, INT_MAX});
2745       } else if (0 == strcmp(args->args[i].key,
2746                              GRPC_ARG_HTTP2_MAX_PINGS_WITHOUT_DATA)) {
2747         g_default_max_pings_without_data = grpc_channel_arg_get_integer(
2748             &args->args[i], {g_default_max_pings_without_data, 0, INT_MAX});
2749       } else if (0 ==
2750                  strcmp(
2751                      args->args[i].key,
2752                      GRPC_ARG_HTTP2_MIN_SENT_PING_INTERVAL_WITHOUT_DATA_MS)) {
2753         g_default_min_sent_ping_interval_without_data_ms =
2754             grpc_channel_arg_get_integer(
2755                 &args->args[i],
2756                 {g_default_min_sent_ping_interval_without_data_ms, 0, INT_MAX});
2757       } else if (0 ==
2758                  strcmp(
2759                      args->args[i].key,
2760                      GRPC_ARG_HTTP2_MIN_RECV_PING_INTERVAL_WITHOUT_DATA_MS)) {
2761         g_default_min_recv_ping_interval_without_data_ms =
2762             grpc_channel_arg_get_integer(
2763                 &args->args[i],
2764                 {g_default_min_recv_ping_interval_without_data_ms, 0, INT_MAX});
2765       }
2766     }
2767   }
2768 }
2769
2770 static void init_keepalive_ping_locked(void* arg, grpc_error* error) {
2771   grpc_chttp2_transport* t = static_cast<grpc_chttp2_transport*>(arg);
2772   GPR_ASSERT(t->keepalive_state == GRPC_CHTTP2_KEEPALIVE_STATE_WAITING);
2773   if (t->destroying || t->closed_with_error != GRPC_ERROR_NONE) {
2774     t->keepalive_state = GRPC_CHTTP2_KEEPALIVE_STATE_DYING;
2775   } else if (error == GRPC_ERROR_NONE) {
2776     if (t->keepalive_permit_without_calls ||
2777         grpc_chttp2_stream_map_size(&t->stream_map) > 0) {
2778       t->keepalive_state = GRPC_CHTTP2_KEEPALIVE_STATE_PINGING;
2779       GRPC_CHTTP2_REF_TRANSPORT(t, "keepalive ping end");
2780       grpc_timer_init_unset(&t->keepalive_watchdog_timer);
2781       send_keepalive_ping_locked(t);
2782       grpc_chttp2_initiate_write(t, GRPC_CHTTP2_INITIATE_WRITE_KEEPALIVE_PING);
2783     } else {
2784       GRPC_CHTTP2_REF_TRANSPORT(t, "init keepalive ping");
2785       grpc_timer_init(&t->keepalive_ping_timer,
2786                       grpc_core::ExecCtx::Get()->Now() + t->keepalive_time,
2787                       &t->init_keepalive_ping_locked);
2788     }
2789   } else if (error == GRPC_ERROR_CANCELLED) {
2790     /* The keepalive ping timer may be cancelled by bdp */
2791     GRPC_CHTTP2_REF_TRANSPORT(t, "init keepalive ping");
2792     grpc_timer_init(&t->keepalive_ping_timer,
2793                     grpc_core::ExecCtx::Get()->Now() + t->keepalive_time,
2794                     &t->init_keepalive_ping_locked);
2795   }
2796   GRPC_CHTTP2_UNREF_TRANSPORT(t, "init keepalive ping");
2797 }
2798
2799 static void start_keepalive_ping_locked(void* arg, grpc_error* error) {
2800   grpc_chttp2_transport* t = static_cast<grpc_chttp2_transport*>(arg);
2801   if (error != GRPC_ERROR_NONE) {
2802     return;
2803   }
2804   if (t->channelz_socket != nullptr) {
2805     t->channelz_socket->RecordKeepaliveSent();
2806   }
2807   if (GRPC_TRACE_FLAG_ENABLED(grpc_http_trace)) {
2808     gpr_log(GPR_INFO, "%s: Start keepalive ping", t->peer_string);
2809   }
2810   GRPC_CHTTP2_REF_TRANSPORT(t, "keepalive watchdog");
2811   grpc_timer_init(&t->keepalive_watchdog_timer,
2812                   grpc_core::ExecCtx::Get()->Now() + t->keepalive_timeout,
2813                   &t->keepalive_watchdog_fired_locked);
2814 }
2815
2816 static void finish_keepalive_ping_locked(void* arg, grpc_error* error) {
2817   grpc_chttp2_transport* t = static_cast<grpc_chttp2_transport*>(arg);
2818   if (t->keepalive_state == GRPC_CHTTP2_KEEPALIVE_STATE_PINGING) {
2819     if (error == GRPC_ERROR_NONE) {
2820       if (GRPC_TRACE_FLAG_ENABLED(grpc_http_trace)) {
2821         gpr_log(GPR_INFO, "%s: Finish keepalive ping", t->peer_string);
2822       }
2823       t->keepalive_state = GRPC_CHTTP2_KEEPALIVE_STATE_WAITING;
2824       grpc_timer_cancel(&t->keepalive_watchdog_timer);
2825       GRPC_CHTTP2_REF_TRANSPORT(t, "init keepalive ping");
2826       grpc_timer_init(&t->keepalive_ping_timer,
2827                       grpc_core::ExecCtx::Get()->Now() + t->keepalive_time,
2828                       &t->init_keepalive_ping_locked);
2829     }
2830   }
2831   GRPC_CHTTP2_UNREF_TRANSPORT(t, "keepalive ping end");
2832 }
2833
2834 static void keepalive_watchdog_fired_locked(void* arg, grpc_error* error) {
2835   grpc_chttp2_transport* t = static_cast<grpc_chttp2_transport*>(arg);
2836   if (t->keepalive_state == GRPC_CHTTP2_KEEPALIVE_STATE_PINGING) {
2837     if (error == GRPC_ERROR_NONE) {
2838       gpr_log(GPR_ERROR, "%s: Keepalive watchdog fired. Closing transport.",
2839               t->peer_string);
2840       t->keepalive_state = GRPC_CHTTP2_KEEPALIVE_STATE_DYING;
2841       close_transport_locked(
2842           t, grpc_error_set_int(GRPC_ERROR_CREATE_FROM_STATIC_STRING(
2843                                     "keepalive watchdog timeout"),
2844                                 GRPC_ERROR_INT_GRPC_STATUS,
2845                                 GRPC_STATUS_UNAVAILABLE));
2846     }
2847   } else {
2848     /* The watchdog timer should have been cancelled by
2849      * finish_keepalive_ping_locked. */
2850     if (GPR_UNLIKELY(error != GRPC_ERROR_CANCELLED)) {
2851       gpr_log(GPR_ERROR, "keepalive_ping_end state error: %d (expect: %d)",
2852               t->keepalive_state, GRPC_CHTTP2_KEEPALIVE_STATE_PINGING);
2853     }
2854   }
2855   GRPC_CHTTP2_UNREF_TRANSPORT(t, "keepalive watchdog");
2856 }
2857
2858 /*******************************************************************************
2859  * CALLBACK LOOP
2860  */
2861
2862 static void connectivity_state_set(grpc_chttp2_transport* t,
2863                                    grpc_connectivity_state state,
2864                                    const char* reason) {
2865   GRPC_CHTTP2_IF_TRACING(
2866       gpr_log(GPR_INFO, "transport %p set connectivity_state=%d", t, state));
2867   grpc_connectivity_state_set(&t->channel_callback.state_tracker, state,
2868                               reason);
2869 }
2870
2871 /*******************************************************************************
2872  * POLLSET STUFF
2873  */
2874
2875 static void set_pollset(grpc_transport* gt, grpc_stream* gs,
2876                         grpc_pollset* pollset) {
2877   grpc_chttp2_transport* t = reinterpret_cast<grpc_chttp2_transport*>(gt);
2878   grpc_endpoint_add_to_pollset(t->ep, pollset);
2879 }
2880
2881 static void set_pollset_set(grpc_transport* gt, grpc_stream* gs,
2882                             grpc_pollset_set* pollset_set) {
2883   grpc_chttp2_transport* t = reinterpret_cast<grpc_chttp2_transport*>(gt);
2884   grpc_endpoint_add_to_pollset_set(t->ep, pollset_set);
2885 }
2886
2887 /*******************************************************************************
2888  * BYTE STREAM
2889  */
2890
2891 static void reset_byte_stream(void* arg, grpc_error* error) {
2892   grpc_chttp2_stream* s = static_cast<grpc_chttp2_stream*>(arg);
2893   s->pending_byte_stream = false;
2894   if (error == GRPC_ERROR_NONE) {
2895     grpc_chttp2_maybe_complete_recv_message(s->t, s);
2896     grpc_chttp2_maybe_complete_recv_trailing_metadata(s->t, s);
2897   } else {
2898     GPR_ASSERT(error != GRPC_ERROR_NONE);
2899     GRPC_CLOSURE_SCHED(s->on_next, GRPC_ERROR_REF(error));
2900     s->on_next = nullptr;
2901     GRPC_ERROR_UNREF(s->byte_stream_error);
2902     s->byte_stream_error = GRPC_ERROR_NONE;
2903     grpc_chttp2_cancel_stream(s->t, s, GRPC_ERROR_REF(error));
2904     s->byte_stream_error = GRPC_ERROR_REF(error);
2905   }
2906 }
2907
2908 namespace grpc_core {
2909
2910 Chttp2IncomingByteStream::Chttp2IncomingByteStream(
2911     grpc_chttp2_transport* transport, grpc_chttp2_stream* stream,
2912     uint32_t frame_size, uint32_t flags)
2913     : ByteStream(frame_size, flags),
2914       transport_(transport),
2915       stream_(stream),
2916       refs_(2),
2917       remaining_bytes_(frame_size) {
2918   GRPC_ERROR_UNREF(stream->byte_stream_error);
2919   stream->byte_stream_error = GRPC_ERROR_NONE;
2920 }
2921
2922 void Chttp2IncomingByteStream::OrphanLocked(void* arg,
2923                                             grpc_error* error_ignored) {
2924   Chttp2IncomingByteStream* bs = static_cast<Chttp2IncomingByteStream*>(arg);
2925   grpc_chttp2_stream* s = bs->stream_;
2926   grpc_chttp2_transport* t = s->t;
2927   bs->Unref();
2928   s->pending_byte_stream = false;
2929   grpc_chttp2_maybe_complete_recv_message(t, s);
2930   grpc_chttp2_maybe_complete_recv_trailing_metadata(t, s);
2931 }
2932
2933 void Chttp2IncomingByteStream::Orphan() {
2934   GPR_TIMER_SCOPE("incoming_byte_stream_destroy", 0);
2935   GRPC_CLOSURE_SCHED(
2936       GRPC_CLOSURE_INIT(&destroy_action_,
2937                         &Chttp2IncomingByteStream::OrphanLocked, this,
2938                         grpc_combiner_scheduler(transport_->combiner)),
2939       GRPC_ERROR_NONE);
2940 }
2941
2942 void Chttp2IncomingByteStream::NextLocked(void* arg,
2943                                           grpc_error* error_ignored) {
2944   Chttp2IncomingByteStream* bs = static_cast<Chttp2IncomingByteStream*>(arg);
2945   grpc_chttp2_transport* t = bs->transport_;
2946   grpc_chttp2_stream* s = bs->stream_;
2947   size_t cur_length = s->frame_storage.length;
2948   if (!s->read_closed) {
2949     s->flow_control->IncomingByteStreamUpdate(bs->next_action_.max_size_hint,
2950                                               cur_length);
2951     grpc_chttp2_act_on_flowctl_action(s->flow_control->MakeAction(), t, s);
2952   }
2953   GPR_ASSERT(s->unprocessed_incoming_frames_buffer.length == 0);
2954   if (s->frame_storage.length > 0) {
2955     grpc_slice_buffer_swap(&s->frame_storage,
2956                            &s->unprocessed_incoming_frames_buffer);
2957     s->unprocessed_incoming_frames_decompressed = false;
2958     GRPC_CLOSURE_SCHED(bs->next_action_.on_complete, GRPC_ERROR_NONE);
2959   } else if (s->byte_stream_error != GRPC_ERROR_NONE) {
2960     GRPC_CLOSURE_SCHED(bs->next_action_.on_complete,
2961                        GRPC_ERROR_REF(s->byte_stream_error));
2962     if (s->data_parser.parsing_frame != nullptr) {
2963       s->data_parser.parsing_frame->Unref();
2964       s->data_parser.parsing_frame = nullptr;
2965     }
2966   } else if (s->read_closed) {
2967     if (bs->remaining_bytes_ != 0) {
2968       s->byte_stream_error =
2969           GRPC_ERROR_CREATE_FROM_STATIC_STRING("Truncated message");
2970       GRPC_CLOSURE_SCHED(bs->next_action_.on_complete,
2971                          GRPC_ERROR_REF(s->byte_stream_error));
2972       if (s->data_parser.parsing_frame != nullptr) {
2973         s->data_parser.parsing_frame->Unref();
2974         s->data_parser.parsing_frame = nullptr;
2975       }
2976     } else {
2977       /* Should never reach here. */
2978       GPR_ASSERT(false);
2979     }
2980   } else {
2981     s->on_next = bs->next_action_.on_complete;
2982   }
2983   bs->Unref();
2984 }
2985
2986 bool Chttp2IncomingByteStream::Next(size_t max_size_hint,
2987                                     grpc_closure* on_complete) {
2988   GPR_TIMER_SCOPE("incoming_byte_stream_next", 0);
2989   if (stream_->unprocessed_incoming_frames_buffer.length > 0) {
2990     return true;
2991   } else {
2992     Ref();
2993     next_action_.max_size_hint = max_size_hint;
2994     next_action_.on_complete = on_complete;
2995     GRPC_CLOSURE_SCHED(
2996         GRPC_CLOSURE_INIT(&next_action_.closure,
2997                           &Chttp2IncomingByteStream::NextLocked, this,
2998                           grpc_combiner_scheduler(transport_->combiner)),
2999         GRPC_ERROR_NONE);
3000     return false;
3001   }
3002 }
3003
3004 void Chttp2IncomingByteStream::MaybeCreateStreamDecompressionCtx() {
3005   GPR_DEBUG_ASSERT(stream_->stream_decompression_method !=
3006                    GRPC_STREAM_COMPRESSION_IDENTITY_DECOMPRESS);
3007   if (!stream_->stream_decompression_ctx) {
3008     stream_->stream_decompression_ctx = grpc_stream_compression_context_create(
3009         stream_->stream_decompression_method);
3010   }
3011 }
3012
3013 grpc_error* Chttp2IncomingByteStream::Pull(grpc_slice* slice) {
3014   GPR_TIMER_SCOPE("incoming_byte_stream_pull", 0);
3015   grpc_error* error;
3016   if (stream_->unprocessed_incoming_frames_buffer.length > 0) {
3017     if (!stream_->unprocessed_incoming_frames_decompressed &&
3018         stream_->stream_decompression_method !=
3019             GRPC_STREAM_COMPRESSION_IDENTITY_DECOMPRESS) {
3020       bool end_of_context;
3021       MaybeCreateStreamDecompressionCtx();
3022       if (!grpc_stream_decompress(stream_->stream_decompression_ctx,
3023                                   &stream_->unprocessed_incoming_frames_buffer,
3024                                   &stream_->decompressed_data_buffer, nullptr,
3025                                   MAX_SIZE_T, &end_of_context)) {
3026         error =
3027             GRPC_ERROR_CREATE_FROM_STATIC_STRING("Stream decompression error.");
3028         return error;
3029       }
3030       GPR_ASSERT(stream_->unprocessed_incoming_frames_buffer.length == 0);
3031       grpc_slice_buffer_swap(&stream_->unprocessed_incoming_frames_buffer,
3032                              &stream_->decompressed_data_buffer);
3033       stream_->unprocessed_incoming_frames_decompressed = true;
3034       if (end_of_context) {
3035         grpc_stream_compression_context_destroy(
3036             stream_->stream_decompression_ctx);
3037         stream_->stream_decompression_ctx = nullptr;
3038       }
3039       if (stream_->unprocessed_incoming_frames_buffer.length == 0) {
3040         *slice = grpc_empty_slice();
3041       }
3042     }
3043     error = grpc_deframe_unprocessed_incoming_frames(
3044         &stream_->data_parser, stream_,
3045         &stream_->unprocessed_incoming_frames_buffer, slice, nullptr);
3046     if (error != GRPC_ERROR_NONE) {
3047       return error;
3048     }
3049   } else {
3050     error = GRPC_ERROR_CREATE_FROM_STATIC_STRING("Truncated message");
3051     GRPC_CLOSURE_SCHED(&stream_->reset_byte_stream, GRPC_ERROR_REF(error));
3052     return error;
3053   }
3054   return GRPC_ERROR_NONE;
3055 }
3056
3057 void Chttp2IncomingByteStream::PublishError(grpc_error* error) {
3058   GPR_ASSERT(error != GRPC_ERROR_NONE);
3059   GRPC_CLOSURE_SCHED(stream_->on_next, GRPC_ERROR_REF(error));
3060   stream_->on_next = nullptr;
3061   GRPC_ERROR_UNREF(stream_->byte_stream_error);
3062   stream_->byte_stream_error = GRPC_ERROR_REF(error);
3063   grpc_chttp2_cancel_stream(transport_, stream_, GRPC_ERROR_REF(error));
3064 }
3065
3066 grpc_error* Chttp2IncomingByteStream::Push(const grpc_slice& slice,
3067                                            grpc_slice* slice_out) {
3068   if (remaining_bytes_ < GRPC_SLICE_LENGTH(slice)) {
3069     grpc_error* error =
3070         GRPC_ERROR_CREATE_FROM_STATIC_STRING("Too many bytes in stream");
3071     GRPC_CLOSURE_SCHED(&stream_->reset_byte_stream, GRPC_ERROR_REF(error));
3072     grpc_slice_unref_internal(slice);
3073     return error;
3074   } else {
3075     remaining_bytes_ -= static_cast<uint32_t> GRPC_SLICE_LENGTH(slice);
3076     if (slice_out != nullptr) {
3077       *slice_out = slice;
3078     }
3079     return GRPC_ERROR_NONE;
3080   }
3081 }
3082
3083 grpc_error* Chttp2IncomingByteStream::Finished(grpc_error* error,
3084                                                bool reset_on_error) {
3085   if (error == GRPC_ERROR_NONE) {
3086     if (remaining_bytes_ != 0) {
3087       error = GRPC_ERROR_CREATE_FROM_STATIC_STRING("Truncated message");
3088     }
3089   }
3090   if (error != GRPC_ERROR_NONE && reset_on_error) {
3091     GRPC_CLOSURE_SCHED(&stream_->reset_byte_stream, GRPC_ERROR_REF(error));
3092   }
3093   Unref();
3094   return error;
3095 }
3096
3097 void Chttp2IncomingByteStream::Shutdown(grpc_error* error) {
3098   GRPC_ERROR_UNREF(Finished(error, true /* reset_on_error */));
3099 }
3100
3101 }  // namespace grpc_core
3102
3103 /*******************************************************************************
3104  * RESOURCE QUOTAS
3105  */
3106
3107 static void post_benign_reclaimer(grpc_chttp2_transport* t) {
3108   if (!t->benign_reclaimer_registered) {
3109     t->benign_reclaimer_registered = true;
3110     GRPC_CHTTP2_REF_TRANSPORT(t, "benign_reclaimer");
3111     grpc_resource_user_post_reclaimer(grpc_endpoint_get_resource_user(t->ep),
3112                                       false, &t->benign_reclaimer_locked);
3113   }
3114 }
3115
3116 static void post_destructive_reclaimer(grpc_chttp2_transport* t) {
3117   if (!t->destructive_reclaimer_registered) {
3118     t->destructive_reclaimer_registered = true;
3119     GRPC_CHTTP2_REF_TRANSPORT(t, "destructive_reclaimer");
3120     grpc_resource_user_post_reclaimer(grpc_endpoint_get_resource_user(t->ep),
3121                                       true, &t->destructive_reclaimer_locked);
3122   }
3123 }
3124
3125 static void benign_reclaimer_locked(void* arg, grpc_error* error) {
3126   grpc_chttp2_transport* t = static_cast<grpc_chttp2_transport*>(arg);
3127   if (error == GRPC_ERROR_NONE &&
3128       grpc_chttp2_stream_map_size(&t->stream_map) == 0) {
3129     /* Channel with no active streams: send a goaway to try and make it
3130      * disconnect cleanly */
3131     if (GRPC_TRACE_FLAG_ENABLED(grpc_resource_quota_trace)) {
3132       gpr_log(GPR_INFO, "HTTP2: %s - send goaway to free memory",
3133               t->peer_string);
3134     }
3135     send_goaway(t,
3136                 grpc_error_set_int(
3137                     GRPC_ERROR_CREATE_FROM_STATIC_STRING("Buffers full"),
3138                     GRPC_ERROR_INT_HTTP2_ERROR, GRPC_HTTP2_ENHANCE_YOUR_CALM));
3139   } else if (error == GRPC_ERROR_NONE &&
3140              GRPC_TRACE_FLAG_ENABLED(grpc_resource_quota_trace)) {
3141     gpr_log(GPR_INFO,
3142             "HTTP2: %s - skip benign reclamation, there are still %" PRIdPTR
3143             " streams",
3144             t->peer_string, grpc_chttp2_stream_map_size(&t->stream_map));
3145   }
3146   t->benign_reclaimer_registered = false;
3147   if (error != GRPC_ERROR_CANCELLED) {
3148     grpc_resource_user_finish_reclamation(
3149         grpc_endpoint_get_resource_user(t->ep));
3150   }
3151   GRPC_CHTTP2_UNREF_TRANSPORT(t, "benign_reclaimer");
3152 }
3153
3154 static void destructive_reclaimer_locked(void* arg, grpc_error* error) {
3155   grpc_chttp2_transport* t = static_cast<grpc_chttp2_transport*>(arg);
3156   size_t n = grpc_chttp2_stream_map_size(&t->stream_map);
3157   t->destructive_reclaimer_registered = false;
3158   if (error == GRPC_ERROR_NONE && n > 0) {
3159     grpc_chttp2_stream* s = static_cast<grpc_chttp2_stream*>(
3160         grpc_chttp2_stream_map_rand(&t->stream_map));
3161     if (GRPC_TRACE_FLAG_ENABLED(grpc_resource_quota_trace)) {
3162       gpr_log(GPR_INFO, "HTTP2: %s - abandon stream id %d", t->peer_string,
3163               s->id);
3164     }
3165     grpc_chttp2_cancel_stream(
3166         t, s,
3167         grpc_error_set_int(GRPC_ERROR_CREATE_FROM_STATIC_STRING("Buffers full"),
3168                            GRPC_ERROR_INT_HTTP2_ERROR,
3169                            GRPC_HTTP2_ENHANCE_YOUR_CALM));
3170     if (n > 1) {
3171       /* Since we cancel one stream per destructive reclamation, if
3172          there are more streams left, we can immediately post a new
3173          reclaimer in case the resource quota needs to free more
3174          memory */
3175       post_destructive_reclaimer(t);
3176     }
3177   }
3178   if (error != GRPC_ERROR_CANCELLED) {
3179     grpc_resource_user_finish_reclamation(
3180         grpc_endpoint_get_resource_user(t->ep));
3181   }
3182   GRPC_CHTTP2_UNREF_TRANSPORT(t, "destructive_reclaimer");
3183 }
3184
3185 /*******************************************************************************
3186  * MONITORING
3187  */
3188
3189 const char* grpc_chttp2_initiate_write_reason_string(
3190     grpc_chttp2_initiate_write_reason reason) {
3191   switch (reason) {
3192     case GRPC_CHTTP2_INITIATE_WRITE_INITIAL_WRITE:
3193       return "INITIAL_WRITE";
3194     case GRPC_CHTTP2_INITIATE_WRITE_START_NEW_STREAM:
3195       return "START_NEW_STREAM";
3196     case GRPC_CHTTP2_INITIATE_WRITE_SEND_MESSAGE:
3197       return "SEND_MESSAGE";
3198     case GRPC_CHTTP2_INITIATE_WRITE_SEND_INITIAL_METADATA:
3199       return "SEND_INITIAL_METADATA";
3200     case GRPC_CHTTP2_INITIATE_WRITE_SEND_TRAILING_METADATA:
3201       return "SEND_TRAILING_METADATA";
3202     case GRPC_CHTTP2_INITIATE_WRITE_RETRY_SEND_PING:
3203       return "RETRY_SEND_PING";
3204     case GRPC_CHTTP2_INITIATE_WRITE_CONTINUE_PINGS:
3205       return "CONTINUE_PINGS";
3206     case GRPC_CHTTP2_INITIATE_WRITE_GOAWAY_SENT:
3207       return "GOAWAY_SENT";
3208     case GRPC_CHTTP2_INITIATE_WRITE_RST_STREAM:
3209       return "RST_STREAM";
3210     case GRPC_CHTTP2_INITIATE_WRITE_CLOSE_FROM_API:
3211       return "CLOSE_FROM_API";
3212     case GRPC_CHTTP2_INITIATE_WRITE_STREAM_FLOW_CONTROL:
3213       return "STREAM_FLOW_CONTROL";
3214     case GRPC_CHTTP2_INITIATE_WRITE_TRANSPORT_FLOW_CONTROL:
3215       return "TRANSPORT_FLOW_CONTROL";
3216     case GRPC_CHTTP2_INITIATE_WRITE_SEND_SETTINGS:
3217       return "SEND_SETTINGS";
3218     case GRPC_CHTTP2_INITIATE_WRITE_FLOW_CONTROL_UNSTALLED_BY_SETTING:
3219       return "FLOW_CONTROL_UNSTALLED_BY_SETTING";
3220     case GRPC_CHTTP2_INITIATE_WRITE_FLOW_CONTROL_UNSTALLED_BY_UPDATE:
3221       return "FLOW_CONTROL_UNSTALLED_BY_UPDATE";
3222     case GRPC_CHTTP2_INITIATE_WRITE_APPLICATION_PING:
3223       return "APPLICATION_PING";
3224     case GRPC_CHTTP2_INITIATE_WRITE_KEEPALIVE_PING:
3225       return "KEEPALIVE_PING";
3226     case GRPC_CHTTP2_INITIATE_WRITE_TRANSPORT_FLOW_CONTROL_UNSTALLED:
3227       return "TRANSPORT_FLOW_CONTROL_UNSTALLED";
3228     case GRPC_CHTTP2_INITIATE_WRITE_PING_RESPONSE:
3229       return "PING_RESPONSE";
3230     case GRPC_CHTTP2_INITIATE_WRITE_FORCE_RST_STREAM:
3231       return "FORCE_RST_STREAM";
3232   }
3233   GPR_UNREACHABLE_CODE(return "unknown");
3234 }
3235
3236 static grpc_endpoint* chttp2_get_endpoint(grpc_transport* t) {
3237   return (reinterpret_cast<grpc_chttp2_transport*>(t))->ep;
3238 }
3239
3240 static const grpc_transport_vtable vtable = {sizeof(grpc_chttp2_stream),
3241                                              "chttp2",
3242                                              init_stream,
3243                                              set_pollset,
3244                                              set_pollset_set,
3245                                              perform_stream_op,
3246                                              perform_transport_op,
3247                                              destroy_stream,
3248                                              destroy_transport,
3249                                              chttp2_get_endpoint};
3250
3251 static const grpc_transport_vtable* get_vtable(void) { return &vtable; }
3252
3253 grpc_core::RefCountedPtr<grpc_core::channelz::SocketNode>
3254 grpc_chttp2_transport_get_socket_node(grpc_transport* transport) {
3255   grpc_chttp2_transport* t =
3256       reinterpret_cast<grpc_chttp2_transport*>(transport);
3257   return t->channelz_socket;
3258 }
3259
3260 grpc_transport* grpc_create_chttp2_transport(
3261     const grpc_channel_args* channel_args, grpc_endpoint* ep, bool is_client,
3262     grpc_resource_user* resource_user) {
3263   auto t = grpc_core::New<grpc_chttp2_transport>(channel_args, ep, is_client,
3264                                                  resource_user);
3265   return &t->base;
3266 }
3267
3268 void grpc_chttp2_transport_start_reading(
3269     grpc_transport* transport, grpc_slice_buffer* read_buffer,
3270     grpc_closure* notify_on_receive_settings) {
3271   grpc_chttp2_transport* t =
3272       reinterpret_cast<grpc_chttp2_transport*>(transport);
3273   GRPC_CHTTP2_REF_TRANSPORT(
3274       t, "reading_action"); /* matches unref inside reading_action */
3275   if (read_buffer != nullptr) {
3276     grpc_slice_buffer_move_into(read_buffer, &t->read_buffer);
3277     gpr_free(read_buffer);
3278   }
3279   t->notify_on_receive_settings = notify_on_receive_settings;
3280   GRPC_CLOSURE_SCHED(&t->read_action_locked, GRPC_ERROR_NONE);
3281 }