--- /dev/null
+/*
+ *
+ * Copyright 2018 gRPC authors.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ *
+ */
+
+#include <grpc/support/port_platform.h>
+
+#include "src/core/ext/transport/chttp2/transport/chttp2_transport.h"
+
+#include <inttypes.h>
+#include <limits.h>
+#include <math.h>
+#include <stdio.h>
+#include <string.h>
+
+#include <grpc/slice_buffer.h>
+#include <grpc/support/alloc.h>
+#include <grpc/support/log.h>
+#include <grpc/support/string_util.h>
+
+#include "src/core/ext/transport/chttp2/transport/context_list.h"
+#include "src/core/ext/transport/chttp2/transport/frame_data.h"
+#include "src/core/ext/transport/chttp2/transport/internal.h"
+#include "src/core/ext/transport/chttp2/transport/varint.h"
+#include "src/core/lib/channel/channel_args.h"
+#include "src/core/lib/compression/stream_compression.h"
+#include "src/core/lib/debug/stats.h"
+#include "src/core/lib/gpr/env.h"
+#include "src/core/lib/gpr/string.h"
+#include "src/core/lib/gprpp/memory.h"
+#include "src/core/lib/http/parser.h"
+#include "src/core/lib/iomgr/executor.h"
+#include "src/core/lib/iomgr/iomgr.h"
+#include "src/core/lib/iomgr/timer.h"
+#include "src/core/lib/profiling/timers.h"
+#include "src/core/lib/slice/slice_internal.h"
+#include "src/core/lib/slice/slice_string_helpers.h"
+#include "src/core/lib/transport/error_utils.h"
+#include "src/core/lib/transport/http2_errors.h"
+#include "src/core/lib/transport/static_metadata.h"
+#include "src/core/lib/transport/status_conversion.h"
+#include "src/core/lib/transport/timeout_encoding.h"
+#include "src/core/lib/transport/transport.h"
+#include "src/core/lib/transport/transport_impl.h"
+#include "src/core/lib/uri/uri_parser.h"
+
+#define DEFAULT_CONNECTION_WINDOW_TARGET (1024 * 1024)
+#define MAX_WINDOW 0x7fffffffu
+#define MAX_WRITE_BUFFER_SIZE (64 * 1024 * 1024)
+#define DEFAULT_MAX_HEADER_LIST_SIZE (8 * 1024)
+
+#define DEFAULT_CLIENT_KEEPALIVE_TIME_MS INT_MAX
+#define DEFAULT_CLIENT_KEEPALIVE_TIMEOUT_MS 20000 /* 20 seconds */
+#define DEFAULT_SERVER_KEEPALIVE_TIME_MS 7200000 /* 2 hours */
+#define DEFAULT_SERVER_KEEPALIVE_TIMEOUT_MS 20000 /* 20 seconds */
+#define DEFAULT_KEEPALIVE_PERMIT_WITHOUT_CALLS false
+#define KEEPALIVE_TIME_BACKOFF_MULTIPLIER 2
+
+#define DEFAULT_MIN_SENT_PING_INTERVAL_WITHOUT_DATA_MS 300000 /* 5 minutes */
+#define DEFAULT_MIN_RECV_PING_INTERVAL_WITHOUT_DATA_MS 300000 /* 5 minutes */
+#define DEFAULT_MAX_PINGS_BETWEEN_DATA 2
+#define DEFAULT_MAX_PING_STRIKES 2
+
+#define DEFAULT_MAX_PENDING_INDUCED_FRAMES 10000
+
+static int g_default_client_keepalive_time_ms =
+ DEFAULT_CLIENT_KEEPALIVE_TIME_MS;
+static int g_default_client_keepalive_timeout_ms =
+ DEFAULT_CLIENT_KEEPALIVE_TIMEOUT_MS;
+static int g_default_server_keepalive_time_ms =
+ DEFAULT_SERVER_KEEPALIVE_TIME_MS;
+static int g_default_server_keepalive_timeout_ms =
+ DEFAULT_SERVER_KEEPALIVE_TIMEOUT_MS;
+static bool g_default_client_keepalive_permit_without_calls =
+ DEFAULT_KEEPALIVE_PERMIT_WITHOUT_CALLS;
+static bool g_default_server_keepalive_permit_without_calls =
+ DEFAULT_KEEPALIVE_PERMIT_WITHOUT_CALLS;
+
+static int g_default_min_sent_ping_interval_without_data_ms =
+ DEFAULT_MIN_SENT_PING_INTERVAL_WITHOUT_DATA_MS;
+static int g_default_min_recv_ping_interval_without_data_ms =
+ DEFAULT_MIN_RECV_PING_INTERVAL_WITHOUT_DATA_MS;
+static int g_default_max_pings_without_data = DEFAULT_MAX_PINGS_BETWEEN_DATA;
+static int g_default_max_ping_strikes = DEFAULT_MAX_PING_STRIKES;
+
+#define MAX_CLIENT_STREAM_ID 0x7fffffffu
+grpc_core::TraceFlag grpc_http_trace(false, "http");
+grpc_core::DebugOnlyTraceFlag grpc_trace_chttp2_refcount(false,
+ "chttp2_refcount");
+
+/* forward declarations of various callbacks that we'll build closures around */
+static void write_action_begin_locked(void* t, grpc_error* error);
+static void write_action(void* t, grpc_error* error);
+static void write_action_end_locked(void* t, grpc_error* error);
+
+static void read_action_locked(void* t, grpc_error* error);
+static void continue_read_action_locked(grpc_chttp2_transport* t);
+
+static void complete_fetch_locked(void* gs, grpc_error* error);
+/** Set a transport level setting, and push it to our peer */
+static void queue_setting_update(grpc_chttp2_transport* t,
+ grpc_chttp2_setting_id id, uint32_t value);
+
+static void close_from_api(grpc_chttp2_transport* t, grpc_chttp2_stream* s,
+ grpc_error* error);
+
+/** Start new streams that have been created if we can */
+static void maybe_start_some_streams(grpc_chttp2_transport* t);
+
+static void connectivity_state_set(grpc_chttp2_transport* t,
+ grpc_connectivity_state state,
+ const char* reason);
+
+static void benign_reclaimer_locked(void* t, grpc_error* error);
+static void destructive_reclaimer_locked(void* t, grpc_error* error);
+
+static void post_benign_reclaimer(grpc_chttp2_transport* t);
+static void post_destructive_reclaimer(grpc_chttp2_transport* t);
+
+static void close_transport_locked(grpc_chttp2_transport* t, grpc_error* error);
+static void end_all_the_calls(grpc_chttp2_transport* t, grpc_error* error);
+
+static void schedule_bdp_ping_locked(grpc_chttp2_transport* t);
+static void start_bdp_ping_locked(void* tp, grpc_error* error);
+static void finish_bdp_ping_locked(void* tp, grpc_error* error);
+static void next_bdp_ping_timer_expired_locked(void* tp, grpc_error* error);
+
+static void cancel_pings(grpc_chttp2_transport* t, grpc_error* error);
+static void send_ping_locked(grpc_chttp2_transport* t,
+ grpc_closure* on_initiate,
+ grpc_closure* on_complete);
+static void retry_initiate_ping_locked(void* tp, grpc_error* error);
+
+/** keepalive-relevant functions */
+static void init_keepalive_ping_locked(void* arg, grpc_error* error);
+static void start_keepalive_ping_locked(void* arg, grpc_error* error);
+static void finish_keepalive_ping_locked(void* arg, grpc_error* error);
+static void keepalive_watchdog_fired_locked(void* arg, grpc_error* error);
+
+static void reset_byte_stream(void* arg, grpc_error* error);
+
+// Flow control default enabled. Can be disabled by setting
+// GRPC_EXPERIMENTAL_DISABLE_FLOW_CONTROL
+bool g_flow_control_enabled = true;
+
+/*******************************************************************************
+ * CONSTRUCTION/DESTRUCTION/REFCOUNTING
+ */
+
+grpc_chttp2_transport::~grpc_chttp2_transport() {
+ size_t i;
+
+ if (channelz_socket != nullptr) {
+ channelz_socket.reset();
+ }
+
+ grpc_endpoint_destroy(ep);
+
+ grpc_slice_buffer_destroy_internal(&qbuf);
+
+ grpc_slice_buffer_destroy_internal(&outbuf);
+ grpc_chttp2_hpack_compressor_destroy(&hpack_compressor);
+
+ grpc_error* error =
+ GRPC_ERROR_CREATE_FROM_STATIC_STRING("Transport destroyed");
+ // ContextList::Execute follows semantics of a callback function and does not
+ // take a ref on error
+ grpc_core::ContextList::Execute(cl, nullptr, error);
+ GRPC_ERROR_UNREF(error);
+ cl = nullptr;
+
+ grpc_slice_buffer_destroy_internal(&read_buffer);
+ grpc_chttp2_hpack_parser_destroy(&hpack_parser);
+ grpc_chttp2_goaway_parser_destroy(&goaway_parser);
+
+ for (i = 0; i < STREAM_LIST_COUNT; i++) {
+ GPR_ASSERT(lists[i].head == nullptr);
+ GPR_ASSERT(lists[i].tail == nullptr);
+ }
+
+ GRPC_ERROR_UNREF(goaway_error);
+
+ GPR_ASSERT(grpc_chttp2_stream_map_size(&stream_map) == 0);
+
+ grpc_chttp2_stream_map_destroy(&stream_map);
+ grpc_connectivity_state_destroy(&channel_callback.state_tracker);
+
+ GRPC_COMBINER_UNREF(combiner, "chttp2_transport");
+
+ cancel_pings(this,
+ GRPC_ERROR_CREATE_FROM_STATIC_STRING("Transport destroyed"));
+
+ while (write_cb_pool) {
+ grpc_chttp2_write_cb* next = write_cb_pool->next;
+ gpr_free(write_cb_pool);
+ write_cb_pool = next;
+ }
+
+ flow_control.Destroy();
+
+ GRPC_ERROR_UNREF(closed_with_error);
+ gpr_free(ping_acks);
+ gpr_free(peer_string);
+}
+
+static const grpc_transport_vtable* get_vtable(void);
+
+/* Returns whether bdp is enabled */
+static bool read_channel_args(grpc_chttp2_transport* t,
+ const grpc_channel_args* channel_args,
+ bool is_client) {
+ bool enable_bdp = true;
+ bool channelz_enabled = GRPC_ENABLE_CHANNELZ_DEFAULT;
+ size_t i;
+ int j;
+
+ for (i = 0; i < channel_args->num_args; i++) {
+ if (0 == strcmp(channel_args->args[i].key,
+ GRPC_ARG_HTTP2_INITIAL_SEQUENCE_NUMBER)) {
+ const grpc_integer_options options = {-1, 0, INT_MAX};
+ const int value =
+ grpc_channel_arg_get_integer(&channel_args->args[i], options);
+ if (value >= 0) {
+ if ((t->next_stream_id & 1) != (value & 1)) {
+ gpr_log(GPR_ERROR, "%s: low bit must be %d on %s",
+ GRPC_ARG_HTTP2_INITIAL_SEQUENCE_NUMBER, t->next_stream_id & 1,
+ is_client ? "client" : "server");
+ } else {
+ t->next_stream_id = static_cast<uint32_t>(value);
+ }
+ }
+ } else if (0 == strcmp(channel_args->args[i].key,
+ GRPC_ARG_HTTP2_HPACK_TABLE_SIZE_ENCODER)) {
+ const grpc_integer_options options = {-1, 0, INT_MAX};
+ const int value =
+ grpc_channel_arg_get_integer(&channel_args->args[i], options);
+ if (value >= 0) {
+ grpc_chttp2_hpack_compressor_set_max_usable_size(
+ &t->hpack_compressor, static_cast<uint32_t>(value));
+ }
+ } else if (0 == strcmp(channel_args->args[i].key,
+ GRPC_ARG_HTTP2_MAX_PINGS_WITHOUT_DATA)) {
+ t->ping_policy.max_pings_without_data = grpc_channel_arg_get_integer(
+ &channel_args->args[i],
+ {g_default_max_pings_without_data, 0, INT_MAX});
+ } else if (0 == strcmp(channel_args->args[i].key,
+ GRPC_ARG_HTTP2_MAX_PING_STRIKES)) {
+ t->ping_policy.max_ping_strikes = grpc_channel_arg_get_integer(
+ &channel_args->args[i], {g_default_max_ping_strikes, 0, INT_MAX});
+ } else if (0 ==
+ strcmp(channel_args->args[i].key,
+ GRPC_ARG_HTTP2_MIN_SENT_PING_INTERVAL_WITHOUT_DATA_MS)) {
+ t->ping_policy.min_sent_ping_interval_without_data =
+ grpc_channel_arg_get_integer(
+ &channel_args->args[i],
+ grpc_integer_options{
+ g_default_min_sent_ping_interval_without_data_ms, 0,
+ INT_MAX});
+ } else if (0 ==
+ strcmp(channel_args->args[i].key,
+ GRPC_ARG_HTTP2_MIN_RECV_PING_INTERVAL_WITHOUT_DATA_MS)) {
+ t->ping_policy.min_recv_ping_interval_without_data =
+ grpc_channel_arg_get_integer(
+ &channel_args->args[i],
+ grpc_integer_options{
+ g_default_min_recv_ping_interval_without_data_ms, 0,
+ INT_MAX});
+ } else if (0 == strcmp(channel_args->args[i].key,
+ GRPC_ARG_HTTP2_WRITE_BUFFER_SIZE)) {
+ t->write_buffer_size = static_cast<uint32_t>(grpc_channel_arg_get_integer(
+ &channel_args->args[i], {0, 0, MAX_WRITE_BUFFER_SIZE}));
+ } else if (0 ==
+ strcmp(channel_args->args[i].key, GRPC_ARG_HTTP2_BDP_PROBE)) {
+ enable_bdp = grpc_channel_arg_get_bool(&channel_args->args[i], true);
+ } else if (0 ==
+ strcmp(channel_args->args[i].key, GRPC_ARG_KEEPALIVE_TIME_MS)) {
+ const int value = grpc_channel_arg_get_integer(
+ &channel_args->args[i],
+ grpc_integer_options{t->is_client
+ ? g_default_client_keepalive_time_ms
+ : g_default_server_keepalive_time_ms,
+ 1, INT_MAX});
+ t->keepalive_time = value == INT_MAX ? GRPC_MILLIS_INF_FUTURE : value;
+ } else if (0 == strcmp(channel_args->args[i].key,
+ GRPC_ARG_KEEPALIVE_TIMEOUT_MS)) {
+ const int value = grpc_channel_arg_get_integer(
+ &channel_args->args[i],
+ grpc_integer_options{t->is_client
+ ? g_default_client_keepalive_timeout_ms
+ : g_default_server_keepalive_timeout_ms,
+ 0, INT_MAX});
+ t->keepalive_timeout = value == INT_MAX ? GRPC_MILLIS_INF_FUTURE : value;
+ } else if (0 == strcmp(channel_args->args[i].key,
+ GRPC_ARG_KEEPALIVE_PERMIT_WITHOUT_CALLS)) {
+ t->keepalive_permit_without_calls = static_cast<uint32_t>(
+ grpc_channel_arg_get_integer(&channel_args->args[i], {0, 0, 1}));
+ } else if (0 == strcmp(channel_args->args[i].key,
+ GRPC_ARG_OPTIMIZATION_TARGET)) {
+ if (channel_args->args[i].type != GRPC_ARG_STRING) {
+ gpr_log(GPR_ERROR, "%s should be a string",
+ GRPC_ARG_OPTIMIZATION_TARGET);
+ } else if (0 == strcmp(channel_args->args[i].value.string, "blend")) {
+ t->opt_target = GRPC_CHTTP2_OPTIMIZE_FOR_LATENCY;
+ } else if (0 == strcmp(channel_args->args[i].value.string, "latency")) {
+ t->opt_target = GRPC_CHTTP2_OPTIMIZE_FOR_LATENCY;
+ } else if (0 ==
+ strcmp(channel_args->args[i].value.string, "throughput")) {
+ t->opt_target = GRPC_CHTTP2_OPTIMIZE_FOR_THROUGHPUT;
+ } else {
+ gpr_log(GPR_ERROR, "%s value '%s' unknown, assuming 'blend'",
+ GRPC_ARG_OPTIMIZATION_TARGET,
+ channel_args->args[i].value.string);
+ }
+ } else if (0 ==
+ strcmp(channel_args->args[i].key, GRPC_ARG_ENABLE_CHANNELZ)) {
+ channelz_enabled = grpc_channel_arg_get_bool(
+ &channel_args->args[i], GRPC_ENABLE_CHANNELZ_DEFAULT);
+ } else {
+ static const struct {
+ const char* channel_arg_name;
+ grpc_chttp2_setting_id setting_id;
+ grpc_integer_options integer_options;
+ bool availability[2] /* server, client */;
+ } settings_map[] = {{GRPC_ARG_MAX_CONCURRENT_STREAMS,
+ GRPC_CHTTP2_SETTINGS_MAX_CONCURRENT_STREAMS,
+ {-1, 0, INT32_MAX},
+ {true, false}},
+ {GRPC_ARG_HTTP2_HPACK_TABLE_SIZE_DECODER,
+ GRPC_CHTTP2_SETTINGS_HEADER_TABLE_SIZE,
+ {-1, 0, INT32_MAX},
+ {true, true}},
+ {GRPC_ARG_MAX_METADATA_SIZE,
+ GRPC_CHTTP2_SETTINGS_MAX_HEADER_LIST_SIZE,
+ {-1, 0, INT32_MAX},
+ {true, true}},
+ {GRPC_ARG_HTTP2_MAX_FRAME_SIZE,
+ GRPC_CHTTP2_SETTINGS_MAX_FRAME_SIZE,
+ {-1, 16384, 16777215},
+ {true, true}},
+ {GRPC_ARG_HTTP2_ENABLE_TRUE_BINARY,
+ GRPC_CHTTP2_SETTINGS_GRPC_ALLOW_TRUE_BINARY_METADATA,
+ {1, 0, 1},
+ {true, true}},
+ {GRPC_ARG_HTTP2_STREAM_LOOKAHEAD_BYTES,
+ GRPC_CHTTP2_SETTINGS_INITIAL_WINDOW_SIZE,
+ {-1, 5, INT32_MAX},
+ {true, true}}};
+ for (j = 0; j < static_cast<int> GPR_ARRAY_SIZE(settings_map); j++) {
+ if (0 == strcmp(channel_args->args[i].key,
+ settings_map[j].channel_arg_name)) {
+ if (!settings_map[j].availability[is_client]) {
+ gpr_log(GPR_DEBUG, "%s is not available on %s",
+ settings_map[j].channel_arg_name,
+ is_client ? "clients" : "servers");
+ } else {
+ int value = grpc_channel_arg_get_integer(
+ &channel_args->args[i], settings_map[j].integer_options);
+ if (value >= 0) {
+ queue_setting_update(t, settings_map[j].setting_id,
+ static_cast<uint32_t>(value));
+ }
+ }
+ break;
+ }
+ }
+ }
+ }
+ if (channelz_enabled) {
+ // TODO(ncteisen): add an API to endpoint to query for local addr, and pass
+ // it in here, so SocketNode knows its own address.
+ char* socket_name = nullptr;
+ gpr_asprintf(&socket_name, "%s %s", get_vtable()->name, t->peer_string);
+ t->channelz_socket =
+ grpc_core::MakeRefCounted<grpc_core::channelz::SocketNode>(
+ grpc_core::UniquePtr<char>(),
+ grpc_core::UniquePtr<char>(gpr_strdup(t->peer_string)),
+ grpc_core::UniquePtr<char>(socket_name));
+ }
+ return enable_bdp;
+}
+
+static void init_transport_closures(grpc_chttp2_transport* t) {
+ GRPC_CLOSURE_INIT(&t->read_action_locked, read_action_locked, t,
+ grpc_combiner_scheduler(t->combiner));
+ GRPC_CLOSURE_INIT(&t->benign_reclaimer_locked, benign_reclaimer_locked, t,
+ grpc_combiner_scheduler(t->combiner));
+ GRPC_CLOSURE_INIT(&t->destructive_reclaimer_locked,
+ destructive_reclaimer_locked, t,
+ grpc_combiner_scheduler(t->combiner));
+ GRPC_CLOSURE_INIT(&t->retry_initiate_ping_locked, retry_initiate_ping_locked,
+ t, grpc_combiner_scheduler(t->combiner));
+ GRPC_CLOSURE_INIT(&t->start_bdp_ping_locked, start_bdp_ping_locked, t,
+ grpc_combiner_scheduler(t->combiner));
+ GRPC_CLOSURE_INIT(&t->finish_bdp_ping_locked, finish_bdp_ping_locked, t,
+ grpc_combiner_scheduler(t->combiner));
+ GRPC_CLOSURE_INIT(&t->next_bdp_ping_timer_expired_locked,
+ next_bdp_ping_timer_expired_locked, t,
+ grpc_combiner_scheduler(t->combiner));
+ GRPC_CLOSURE_INIT(&t->init_keepalive_ping_locked, init_keepalive_ping_locked,
+ t, grpc_combiner_scheduler(t->combiner));
+ GRPC_CLOSURE_INIT(&t->start_keepalive_ping_locked,
+ start_keepalive_ping_locked, t,
+ grpc_combiner_scheduler(t->combiner));
+ GRPC_CLOSURE_INIT(&t->finish_keepalive_ping_locked,
+ finish_keepalive_ping_locked, t,
+ grpc_combiner_scheduler(t->combiner));
+ GRPC_CLOSURE_INIT(&t->keepalive_watchdog_fired_locked,
+ keepalive_watchdog_fired_locked, t,
+ grpc_combiner_scheduler(t->combiner));
+}
+
+static void init_transport_keepalive_settings(grpc_chttp2_transport* t) {
+ if (t->is_client) {
+ t->keepalive_time = g_default_client_keepalive_time_ms == INT_MAX
+ ? GRPC_MILLIS_INF_FUTURE
+ : g_default_client_keepalive_time_ms;
+ t->keepalive_timeout = g_default_client_keepalive_timeout_ms == INT_MAX
+ ? GRPC_MILLIS_INF_FUTURE
+ : g_default_client_keepalive_timeout_ms;
+ t->keepalive_permit_without_calls =
+ g_default_client_keepalive_permit_without_calls;
+ } else {
+ t->keepalive_time = g_default_server_keepalive_time_ms == INT_MAX
+ ? GRPC_MILLIS_INF_FUTURE
+ : g_default_server_keepalive_time_ms;
+ t->keepalive_timeout = g_default_server_keepalive_timeout_ms == INT_MAX
+ ? GRPC_MILLIS_INF_FUTURE
+ : g_default_server_keepalive_timeout_ms;
+ t->keepalive_permit_without_calls =
+ g_default_server_keepalive_permit_without_calls;
+ }
+}
+
+static void configure_transport_ping_policy(grpc_chttp2_transport* t) {
+ t->ping_policy.max_pings_without_data = g_default_max_pings_without_data;
+ t->ping_policy.min_sent_ping_interval_without_data =
+ g_default_min_sent_ping_interval_without_data_ms;
+ t->ping_policy.max_ping_strikes = g_default_max_ping_strikes;
+ t->ping_policy.min_recv_ping_interval_without_data =
+ g_default_min_recv_ping_interval_without_data_ms;
+}
+
+static void init_keepalive_pings_if_enabled(grpc_chttp2_transport* t) {
+ if (t->keepalive_time != GRPC_MILLIS_INF_FUTURE) {
+ t->keepalive_state = GRPC_CHTTP2_KEEPALIVE_STATE_WAITING;
+ GRPC_CHTTP2_REF_TRANSPORT(t, "init keepalive ping");
+ grpc_timer_init(&t->keepalive_ping_timer,
+ grpc_core::ExecCtx::Get()->Now() + t->keepalive_time,
+ &t->init_keepalive_ping_locked);
+ } else {
+ /* Use GRPC_CHTTP2_KEEPALIVE_STATE_DISABLED to indicate there are no
+ inflight keeaplive timers */
+ t->keepalive_state = GRPC_CHTTP2_KEEPALIVE_STATE_DISABLED;
+ }
+}
+
+grpc_chttp2_transport::grpc_chttp2_transport(
+ const grpc_channel_args* channel_args, grpc_endpoint* ep, bool is_client,
+ grpc_resource_user* resource_user)
+ : refs(1, &grpc_trace_chttp2_refcount),
+ ep(ep),
+ peer_string(grpc_endpoint_get_peer(ep)),
+ resource_user(resource_user),
+ combiner(grpc_combiner_create()),
+ is_client(is_client),
+ next_stream_id(is_client ? 1 : 2),
+ deframe_state(is_client ? GRPC_DTS_FH_0 : GRPC_DTS_CLIENT_PREFIX_0) {
+ GPR_ASSERT(strlen(GRPC_CHTTP2_CLIENT_CONNECT_STRING) ==
+ GRPC_CHTTP2_CLIENT_CONNECT_STRLEN);
+ base.vtable = get_vtable();
+ /* 8 is a random stab in the dark as to a good initial size: it's small enough
+ that it shouldn't waste memory for infrequently used connections, yet
+ large enough that the exponential growth should happen nicely when it's
+ needed.
+ TODO(ctiller): tune this */
+ grpc_chttp2_stream_map_init(&stream_map, 8);
+
+ grpc_slice_buffer_init(&read_buffer);
+ grpc_connectivity_state_init(
+ &channel_callback.state_tracker, GRPC_CHANNEL_READY,
+ is_client ? "client_transport" : "server_transport");
+ grpc_slice_buffer_init(&outbuf);
+ if (is_client) {
+ grpc_slice_buffer_add(&outbuf, grpc_slice_from_copied_string(
+ GRPC_CHTTP2_CLIENT_CONNECT_STRING));
+ }
+ grpc_chttp2_hpack_compressor_init(&hpack_compressor);
+ grpc_slice_buffer_init(&qbuf);
+ /* copy in initial settings to all setting sets */
+ size_t i;
+ int j;
+ for (i = 0; i < GRPC_CHTTP2_NUM_SETTINGS; i++) {
+ for (j = 0; j < GRPC_NUM_SETTING_SETS; j++) {
+ settings[j][i] = grpc_chttp2_settings_parameters[i].default_value;
+ }
+ }
+ grpc_chttp2_hpack_parser_init(&hpack_parser);
+ grpc_chttp2_goaway_parser_init(&goaway_parser);
+
+ init_transport_closures(this);
+
+ /* configure http2 the way we like it */
+ if (is_client) {
+ queue_setting_update(this, GRPC_CHTTP2_SETTINGS_ENABLE_PUSH, 0);
+ queue_setting_update(this, GRPC_CHTTP2_SETTINGS_MAX_CONCURRENT_STREAMS, 0);
+ }
+ queue_setting_update(this, GRPC_CHTTP2_SETTINGS_MAX_HEADER_LIST_SIZE,
+ DEFAULT_MAX_HEADER_LIST_SIZE);
+ queue_setting_update(this,
+ GRPC_CHTTP2_SETTINGS_GRPC_ALLOW_TRUE_BINARY_METADATA, 1);
+
+ configure_transport_ping_policy(this);
+ init_transport_keepalive_settings(this);
+
+ bool enable_bdp = true;
+ if (channel_args) {
+ enable_bdp = read_channel_args(this, channel_args, is_client);
+ }
+
+ if (g_flow_control_enabled) {
+ flow_control.Init<grpc_core::chttp2::TransportFlowControl>(this,
+ enable_bdp);
+ } else {
+ flow_control.Init<grpc_core::chttp2::TransportFlowControlDisabled>(this);
+ enable_bdp = false;
+ }
+
+ /* No pings allowed before receiving a header or data frame. */
+ ping_state.pings_before_data_required = 0;
+ ping_state.is_delayed_ping_timer_set = false;
+ ping_state.last_ping_sent_time = GRPC_MILLIS_INF_PAST;
+
+ ping_recv_state.last_ping_recv_time = GRPC_MILLIS_INF_PAST;
+ ping_recv_state.ping_strikes = 0;
+
+ init_keepalive_pings_if_enabled(this);
+
+ if (enable_bdp) {
+ GRPC_CHTTP2_REF_TRANSPORT(this, "bdp_ping");
+ schedule_bdp_ping_locked(this);
+ grpc_chttp2_act_on_flowctl_action(flow_control->PeriodicUpdate(), this,
+ nullptr);
+ }
+
+ grpc_chttp2_initiate_write(this, GRPC_CHTTP2_INITIATE_WRITE_INITIAL_WRITE);
+ post_benign_reclaimer(this);
+}
+
+static void destroy_transport_locked(void* tp, grpc_error* error) {
+ grpc_chttp2_transport* t = static_cast<grpc_chttp2_transport*>(tp);
+ t->destroying = 1;
+ close_transport_locked(
+ t, grpc_error_set_int(
+ GRPC_ERROR_CREATE_FROM_STATIC_STRING("Transport destroyed"),
+ GRPC_ERROR_INT_OCCURRED_DURING_WRITE, t->write_state));
+ // Must be the last line.
+ GRPC_CHTTP2_UNREF_TRANSPORT(t, "destroy");
+}
+
+static void destroy_transport(grpc_transport* gt) {
+ grpc_chttp2_transport* t = reinterpret_cast<grpc_chttp2_transport*>(gt);
+ GRPC_CLOSURE_SCHED(GRPC_CLOSURE_CREATE(destroy_transport_locked, t,
+ grpc_combiner_scheduler(t->combiner)),
+ GRPC_ERROR_NONE);
+}
+
+static void close_transport_locked(grpc_chttp2_transport* t,
+ grpc_error* error) {
+ end_all_the_calls(t, GRPC_ERROR_REF(error));
+ cancel_pings(t, GRPC_ERROR_REF(error));
+ if (t->closed_with_error == GRPC_ERROR_NONE) {
+ if (!grpc_error_has_clear_grpc_status(error)) {
+ error = grpc_error_set_int(error, GRPC_ERROR_INT_GRPC_STATUS,
+ GRPC_STATUS_UNAVAILABLE);
+ }
+ if (t->write_state != GRPC_CHTTP2_WRITE_STATE_IDLE) {
+ if (t->close_transport_on_writes_finished == nullptr) {
+ t->close_transport_on_writes_finished =
+ GRPC_ERROR_CREATE_FROM_STATIC_STRING(
+ "Delayed close due to in-progress write");
+ }
+ t->close_transport_on_writes_finished =
+ grpc_error_add_child(t->close_transport_on_writes_finished, error);
+ return;
+ }
+ GPR_ASSERT(error != GRPC_ERROR_NONE);
+ t->closed_with_error = GRPC_ERROR_REF(error);
+ connectivity_state_set(t, GRPC_CHANNEL_SHUTDOWN, "close_transport");
+ if (t->ping_state.is_delayed_ping_timer_set) {
+ grpc_timer_cancel(&t->ping_state.delayed_ping_timer);
+ }
+ if (t->have_next_bdp_ping_timer) {
+ grpc_timer_cancel(&t->next_bdp_ping_timer);
+ }
+ switch (t->keepalive_state) {
+ case GRPC_CHTTP2_KEEPALIVE_STATE_WAITING:
+ grpc_timer_cancel(&t->keepalive_ping_timer);
+ break;
+ case GRPC_CHTTP2_KEEPALIVE_STATE_PINGING:
+ grpc_timer_cancel(&t->keepalive_ping_timer);
+ grpc_timer_cancel(&t->keepalive_watchdog_timer);
+ break;
+ case GRPC_CHTTP2_KEEPALIVE_STATE_DYING:
+ case GRPC_CHTTP2_KEEPALIVE_STATE_DISABLED:
+ /* keepalive timers are not set in these two states */
+ break;
+ }
+
+ /* flush writable stream list to avoid dangling references */
+ grpc_chttp2_stream* s;
+ while (grpc_chttp2_list_pop_writable_stream(t, &s)) {
+ GRPC_CHTTP2_STREAM_UNREF(s, "chttp2_writing:close");
+ }
+ GPR_ASSERT(t->write_state == GRPC_CHTTP2_WRITE_STATE_IDLE);
+ grpc_endpoint_shutdown(t->ep, GRPC_ERROR_REF(error));
+ }
+ if (t->notify_on_receive_settings != nullptr) {
+ GRPC_CLOSURE_SCHED(t->notify_on_receive_settings, GRPC_ERROR_CANCELLED);
+ t->notify_on_receive_settings = nullptr;
+ }
+ GRPC_ERROR_UNREF(error);
+}
+
+#ifndef NDEBUG
+void grpc_chttp2_stream_ref(grpc_chttp2_stream* s, const char* reason) {
+ grpc_stream_ref(s->refcount, reason);
+}
+void grpc_chttp2_stream_unref(grpc_chttp2_stream* s, const char* reason) {
+ grpc_stream_unref(s->refcount, reason);
+}
+#else
+void grpc_chttp2_stream_ref(grpc_chttp2_stream* s) {
+ grpc_stream_ref(s->refcount);
+}
+void grpc_chttp2_stream_unref(grpc_chttp2_stream* s) {
+ grpc_stream_unref(s->refcount);
+}
+#endif
+
+grpc_chttp2_stream::Reffer::Reffer(grpc_chttp2_stream* s) {
+ /* We reserve one 'active stream' that's dropped when the stream is
+ read-closed. The others are for Chttp2IncomingByteStreams that are
+ actively reading */
+ GRPC_CHTTP2_STREAM_REF(s, "chttp2");
+ GRPC_CHTTP2_REF_TRANSPORT(s->t, "stream");
+}
+
+grpc_chttp2_stream::grpc_chttp2_stream(grpc_chttp2_transport* t,
+ grpc_stream_refcount* refcount,
+ const void* server_data,
+ grpc_core::Arena* arena)
+ : t(t),
+ refcount(refcount),
+ reffer(this),
+ metadata_buffer{grpc_chttp2_incoming_metadata_buffer(arena),
+ grpc_chttp2_incoming_metadata_buffer(arena)} {
+ if (server_data) {
+ id = static_cast<uint32_t>((uintptr_t)server_data);
+ *t->accepting_stream = this;
+ grpc_chttp2_stream_map_add(&t->stream_map, id, this);
+ post_destructive_reclaimer(t);
+ }
+ if (t->flow_control->flow_control_enabled()) {
+ flow_control.Init<grpc_core::chttp2::StreamFlowControl>(
+ static_cast<grpc_core::chttp2::TransportFlowControl*>(
+ t->flow_control.get()),
+ this);
+ } else {
+ flow_control.Init<grpc_core::chttp2::StreamFlowControlDisabled>();
+ }
+
+ grpc_slice_buffer_init(&frame_storage);
+ grpc_slice_buffer_init(&unprocessed_incoming_frames_buffer);
+ grpc_slice_buffer_init(&flow_controlled_buffer);
+
+ GRPC_CLOSURE_INIT(&complete_fetch_locked, ::complete_fetch_locked, this,
+ grpc_combiner_scheduler(t->combiner));
+ GRPC_CLOSURE_INIT(&reset_byte_stream, ::reset_byte_stream, this,
+ grpc_combiner_scheduler(t->combiner));
+}
+
+grpc_chttp2_stream::~grpc_chttp2_stream() {
+ if (t->channelz_socket != nullptr) {
+ if ((t->is_client && eos_received) || (!t->is_client && eos_sent)) {
+ t->channelz_socket->RecordStreamSucceeded();
+ } else {
+ t->channelz_socket->RecordStreamFailed();
+ }
+ }
+
+ GPR_ASSERT((write_closed && read_closed) || id == 0);
+ if (id != 0) {
+ GPR_ASSERT(grpc_chttp2_stream_map_find(&t->stream_map, id) == nullptr);
+ }
+
+ grpc_slice_buffer_destroy_internal(&unprocessed_incoming_frames_buffer);
+ grpc_slice_buffer_destroy_internal(&frame_storage);
+ if (stream_compression_method != GRPC_STREAM_COMPRESSION_IDENTITY_COMPRESS) {
+ grpc_slice_buffer_destroy_internal(&compressed_data_buffer);
+ }
+ if (stream_decompression_method !=
+ GRPC_STREAM_COMPRESSION_IDENTITY_DECOMPRESS) {
+ grpc_slice_buffer_destroy_internal(&decompressed_data_buffer);
+ }
+
+ grpc_chttp2_list_remove_stalled_by_transport(t, this);
+ grpc_chttp2_list_remove_stalled_by_stream(t, this);
+
+ for (int i = 0; i < STREAM_LIST_COUNT; i++) {
+ if (GPR_UNLIKELY(included[i])) {
+ gpr_log(GPR_ERROR, "%s stream %d still included in list %d",
+ t->is_client ? "client" : "server", id, i);
+ abort();
+ }
+ }
+
+ GPR_ASSERT(send_initial_metadata_finished == nullptr);
+ GPR_ASSERT(fetching_send_message == nullptr);
+ GPR_ASSERT(send_trailing_metadata_finished == nullptr);
+ GPR_ASSERT(recv_initial_metadata_ready == nullptr);
+ GPR_ASSERT(recv_message_ready == nullptr);
+ GPR_ASSERT(recv_trailing_metadata_finished == nullptr);
+ grpc_slice_buffer_destroy_internal(&flow_controlled_buffer);
+ GRPC_ERROR_UNREF(read_closed_error);
+ GRPC_ERROR_UNREF(write_closed_error);
+ GRPC_ERROR_UNREF(byte_stream_error);
+
+ flow_control.Destroy();
+
+ if (t->resource_user != nullptr) {
+ grpc_resource_user_free(t->resource_user, GRPC_RESOURCE_QUOTA_CALL_SIZE);
+ }
+
+ GRPC_CHTTP2_UNREF_TRANSPORT(t, "stream");
+ GRPC_CLOSURE_SCHED(destroy_stream_arg, GRPC_ERROR_NONE);
+}
+
+static int init_stream(grpc_transport* gt, grpc_stream* gs,
+ grpc_stream_refcount* refcount, const void* server_data,
+ grpc_core::Arena* arena) {
+ GPR_TIMER_SCOPE("init_stream", 0);
+ grpc_chttp2_transport* t = reinterpret_cast<grpc_chttp2_transport*>(gt);
+ new (gs) grpc_chttp2_stream(t, refcount, server_data, arena);
+ return 0;
+}
+
+static void destroy_stream_locked(void* sp, grpc_error* error) {
+ GPR_TIMER_SCOPE("destroy_stream", 0);
+ grpc_chttp2_stream* s = static_cast<grpc_chttp2_stream*>(sp);
+ s->~grpc_chttp2_stream();
+}
+
+static void destroy_stream(grpc_transport* gt, grpc_stream* gs,
+ grpc_closure* then_schedule_closure) {
+ GPR_TIMER_SCOPE("destroy_stream", 0);
+ grpc_chttp2_transport* t = reinterpret_cast<grpc_chttp2_transport*>(gt);
+ grpc_chttp2_stream* s = reinterpret_cast<grpc_chttp2_stream*>(gs);
+ if (s->stream_compression_method !=
+ GRPC_STREAM_COMPRESSION_IDENTITY_COMPRESS &&
+ s->stream_compression_ctx != nullptr) {
+ grpc_stream_compression_context_destroy(s->stream_compression_ctx);
+ s->stream_compression_ctx = nullptr;
+ }
+ if (s->stream_decompression_method !=
+ GRPC_STREAM_COMPRESSION_IDENTITY_DECOMPRESS &&
+ s->stream_decompression_ctx != nullptr) {
+ grpc_stream_compression_context_destroy(s->stream_decompression_ctx);
+ s->stream_decompression_ctx = nullptr;
+ }
+
+ s->destroy_stream_arg = then_schedule_closure;
+ GRPC_CLOSURE_SCHED(
+ GRPC_CLOSURE_INIT(&s->destroy_stream, destroy_stream_locked, s,
+ grpc_combiner_scheduler(t->combiner)),
+ GRPC_ERROR_NONE);
+}
+
+grpc_chttp2_stream* grpc_chttp2_parsing_accept_stream(grpc_chttp2_transport* t,
+ uint32_t id) {
+ if (t->channel_callback.accept_stream == nullptr) {
+ return nullptr;
+ }
+ // Don't accept the stream if memory quota doesn't allow. Note that we should
+ // simply refuse the stream here instead of canceling the stream after it's
+ // accepted since the latter will create the call which costs much memory.
+ if (t->resource_user != nullptr &&
+ !grpc_resource_user_safe_alloc(t->resource_user,
+ GRPC_RESOURCE_QUOTA_CALL_SIZE)) {
+ gpr_log(GPR_ERROR, "Memory exhausted, rejecting the stream.");
+ grpc_chttp2_add_rst_stream_to_next_write(t, id, GRPC_HTTP2_REFUSED_STREAM,
+ nullptr);
+ grpc_chttp2_initiate_write(t, GRPC_CHTTP2_INITIATE_WRITE_RST_STREAM);
+ return nullptr;
+ }
+ grpc_chttp2_stream* accepting = nullptr;
+ GPR_ASSERT(t->accepting_stream == nullptr);
+ t->accepting_stream = &accepting;
+ t->channel_callback.accept_stream(t->channel_callback.accept_stream_user_data,
+ &t->base,
+ (void*)static_cast<uintptr_t>(id));
+ t->accepting_stream = nullptr;
+ return accepting;
+}
+
+/*******************************************************************************
+ * OUTPUT PROCESSING
+ */
+
+static const char* write_state_name(grpc_chttp2_write_state st) {
+ switch (st) {
+ case GRPC_CHTTP2_WRITE_STATE_IDLE:
+ return "IDLE";
+ case GRPC_CHTTP2_WRITE_STATE_WRITING:
+ return "WRITING";
+ case GRPC_CHTTP2_WRITE_STATE_WRITING_WITH_MORE:
+ return "WRITING+MORE";
+ }
+ GPR_UNREACHABLE_CODE(return "UNKNOWN");
+}
+
+static void set_write_state(grpc_chttp2_transport* t,
+ grpc_chttp2_write_state st, const char* reason) {
+ GRPC_CHTTP2_IF_TRACING(
+ gpr_log(GPR_INFO, "W:%p %s [%s] state %s -> %s [%s]", t,
+ t->is_client ? "CLIENT" : "SERVER", t->peer_string,
+ write_state_name(t->write_state), write_state_name(st), reason));
+ t->write_state = st;
+ /* If the state is being reset back to idle, it means a write was just
+ * finished. Make sure all the run_after_write closures are scheduled.
+ *
+ * This is also our chance to close the transport if the transport was marked
+ * to be closed after all writes finish (for example, if we received a go-away
+ * from peer while we had some pending writes) */
+ if (st == GRPC_CHTTP2_WRITE_STATE_IDLE) {
+ GRPC_CLOSURE_LIST_SCHED(&t->run_after_write);
+ if (t->close_transport_on_writes_finished != nullptr) {
+ grpc_error* err = t->close_transport_on_writes_finished;
+ t->close_transport_on_writes_finished = nullptr;
+ close_transport_locked(t, err);
+ }
+ }
+}
+
+static void inc_initiate_write_reason(
+ grpc_chttp2_initiate_write_reason reason) {
+ switch (reason) {
+ case GRPC_CHTTP2_INITIATE_WRITE_INITIAL_WRITE:
+ GRPC_STATS_INC_HTTP2_INITIATE_WRITE_DUE_TO_INITIAL_WRITE();
+ break;
+ case GRPC_CHTTP2_INITIATE_WRITE_START_NEW_STREAM:
+ GRPC_STATS_INC_HTTP2_INITIATE_WRITE_DUE_TO_START_NEW_STREAM();
+ break;
+ case GRPC_CHTTP2_INITIATE_WRITE_SEND_MESSAGE:
+ GRPC_STATS_INC_HTTP2_INITIATE_WRITE_DUE_TO_SEND_MESSAGE();
+ break;
+ case GRPC_CHTTP2_INITIATE_WRITE_SEND_INITIAL_METADATA:
+ GRPC_STATS_INC_HTTP2_INITIATE_WRITE_DUE_TO_SEND_INITIAL_METADATA();
+ break;
+ case GRPC_CHTTP2_INITIATE_WRITE_SEND_TRAILING_METADATA:
+ GRPC_STATS_INC_HTTP2_INITIATE_WRITE_DUE_TO_SEND_TRAILING_METADATA();
+ break;
+ case GRPC_CHTTP2_INITIATE_WRITE_RETRY_SEND_PING:
+ GRPC_STATS_INC_HTTP2_INITIATE_WRITE_DUE_TO_RETRY_SEND_PING();
+ break;
+ case GRPC_CHTTP2_INITIATE_WRITE_CONTINUE_PINGS:
+ GRPC_STATS_INC_HTTP2_INITIATE_WRITE_DUE_TO_CONTINUE_PINGS();
+ break;
+ case GRPC_CHTTP2_INITIATE_WRITE_GOAWAY_SENT:
+ GRPC_STATS_INC_HTTP2_INITIATE_WRITE_DUE_TO_GOAWAY_SENT();
+ break;
+ case GRPC_CHTTP2_INITIATE_WRITE_RST_STREAM:
+ GRPC_STATS_INC_HTTP2_INITIATE_WRITE_DUE_TO_RST_STREAM();
+ break;
+ case GRPC_CHTTP2_INITIATE_WRITE_CLOSE_FROM_API:
+ GRPC_STATS_INC_HTTP2_INITIATE_WRITE_DUE_TO_CLOSE_FROM_API();
+ break;
+ case GRPC_CHTTP2_INITIATE_WRITE_STREAM_FLOW_CONTROL:
+ GRPC_STATS_INC_HTTP2_INITIATE_WRITE_DUE_TO_STREAM_FLOW_CONTROL();
+ break;
+ case GRPC_CHTTP2_INITIATE_WRITE_TRANSPORT_FLOW_CONTROL:
+ GRPC_STATS_INC_HTTP2_INITIATE_WRITE_DUE_TO_TRANSPORT_FLOW_CONTROL();
+ break;
+ case GRPC_CHTTP2_INITIATE_WRITE_SEND_SETTINGS:
+ GRPC_STATS_INC_HTTP2_INITIATE_WRITE_DUE_TO_SEND_SETTINGS();
+ break;
+ case GRPC_CHTTP2_INITIATE_WRITE_FLOW_CONTROL_UNSTALLED_BY_SETTING:
+ GRPC_STATS_INC_HTTP2_INITIATE_WRITE_DUE_TO_FLOW_CONTROL_UNSTALLED_BY_SETTING();
+ break;
+ case GRPC_CHTTP2_INITIATE_WRITE_FLOW_CONTROL_UNSTALLED_BY_UPDATE:
+ GRPC_STATS_INC_HTTP2_INITIATE_WRITE_DUE_TO_FLOW_CONTROL_UNSTALLED_BY_UPDATE();
+ break;
+ case GRPC_CHTTP2_INITIATE_WRITE_APPLICATION_PING:
+ GRPC_STATS_INC_HTTP2_INITIATE_WRITE_DUE_TO_APPLICATION_PING();
+ break;
+ case GRPC_CHTTP2_INITIATE_WRITE_KEEPALIVE_PING:
+ GRPC_STATS_INC_HTTP2_INITIATE_WRITE_DUE_TO_KEEPALIVE_PING();
+ break;
+ case GRPC_CHTTP2_INITIATE_WRITE_TRANSPORT_FLOW_CONTROL_UNSTALLED:
+ GRPC_STATS_INC_HTTP2_INITIATE_WRITE_DUE_TO_TRANSPORT_FLOW_CONTROL_UNSTALLED();
+ break;
+ case GRPC_CHTTP2_INITIATE_WRITE_PING_RESPONSE:
+ GRPC_STATS_INC_HTTP2_INITIATE_WRITE_DUE_TO_PING_RESPONSE();
+ break;
+ case GRPC_CHTTP2_INITIATE_WRITE_FORCE_RST_STREAM:
+ GRPC_STATS_INC_HTTP2_INITIATE_WRITE_DUE_TO_FORCE_RST_STREAM();
+ break;
+ }
+}
+
+void grpc_chttp2_initiate_write(grpc_chttp2_transport* t,
+ grpc_chttp2_initiate_write_reason reason) {
+ GPR_TIMER_SCOPE("grpc_chttp2_initiate_write", 0);
+
+ switch (t->write_state) {
+ case GRPC_CHTTP2_WRITE_STATE_IDLE:
+ inc_initiate_write_reason(reason);
+ set_write_state(t, GRPC_CHTTP2_WRITE_STATE_WRITING,
+ grpc_chttp2_initiate_write_reason_string(reason));
+ t->is_first_write_in_batch = true;
+ GRPC_CHTTP2_REF_TRANSPORT(t, "writing");
+ /* Note that the 'write_action_begin_locked' closure is being scheduled
+ * on the 'finally_scheduler' of t->combiner. This means that
+ * 'write_action_begin_locked' is called only *after* all the other
+ * closures (some of which are potentially initiating more writes on the
+ * transport) are executed on the t->combiner.
+ *
+ * The reason for scheduling on finally_scheduler is to make sure we batch
+ * as many writes as possible. 'write_action_begin_locked' is the function
+ * that gathers all the relevant bytes (which are at various places in the
+ * grpc_chttp2_transport structure) and append them to 'outbuf' field in
+ * grpc_chttp2_transport thereby batching what would have been potentially
+ * multiple write operations.
+ *
+ * Also, 'write_action_begin_locked' only gathers the bytes into outbuf.
+ * It does not call the endpoint to write the bytes. That is done by the
+ * 'write_action' (which is scheduled by 'write_action_begin_locked') */
+ GRPC_CLOSURE_SCHED(
+ GRPC_CLOSURE_INIT(&t->write_action_begin_locked,
+ write_action_begin_locked, t,
+ grpc_combiner_finally_scheduler(t->combiner)),
+ GRPC_ERROR_NONE);
+ break;
+ case GRPC_CHTTP2_WRITE_STATE_WRITING:
+ set_write_state(t, GRPC_CHTTP2_WRITE_STATE_WRITING_WITH_MORE,
+ grpc_chttp2_initiate_write_reason_string(reason));
+ break;
+ case GRPC_CHTTP2_WRITE_STATE_WRITING_WITH_MORE:
+ break;
+ }
+}
+
+void grpc_chttp2_mark_stream_writable(grpc_chttp2_transport* t,
+ grpc_chttp2_stream* s) {
+ if (t->closed_with_error == GRPC_ERROR_NONE &&
+ grpc_chttp2_list_add_writable_stream(t, s)) {
+ GRPC_CHTTP2_STREAM_REF(s, "chttp2_writing:become");
+ }
+}
+
+static grpc_closure_scheduler* write_scheduler(grpc_chttp2_transport* t,
+ bool early_results_scheduled,
+ bool partial_write) {
+ // If we're already in a background poller, don't offload this to an executor
+ if (grpc_iomgr_is_any_background_poller_thread()) {
+ return grpc_schedule_on_exec_ctx;
+ }
+ /* if it's not the first write in a batch, always offload to the executor:
+ we'll probably end up queuing against the kernel anyway, so we'll likely
+ get better latency overall if we switch writing work elsewhere and continue
+ with application work above */
+ if (!t->is_first_write_in_batch) {
+ return grpc_core::Executor::Scheduler(grpc_core::ExecutorJobType::SHORT);
+ }
+ /* equivalently, if it's a partial write, we *know* we're going to be taking a
+ thread jump to write it because of the above, may as well do so
+ immediately */
+ if (partial_write) {
+ return grpc_core::Executor::Scheduler(grpc_core::ExecutorJobType::SHORT);
+ }
+ switch (t->opt_target) {
+ case GRPC_CHTTP2_OPTIMIZE_FOR_THROUGHPUT:
+ /* executor gives us the largest probability of being able to batch a
+ * write with others on this transport */
+ return grpc_core::Executor::Scheduler(grpc_core::ExecutorJobType::SHORT);
+ case GRPC_CHTTP2_OPTIMIZE_FOR_LATENCY:
+ return grpc_schedule_on_exec_ctx;
+ }
+ GPR_UNREACHABLE_CODE(return nullptr);
+}
+
+#define WRITE_STATE_TUPLE_TO_INT(p, i) (2 * (int)(p) + (int)(i))
+static const char* begin_writing_desc(bool partial, bool inlined) {
+ switch (WRITE_STATE_TUPLE_TO_INT(partial, inlined)) {
+ case WRITE_STATE_TUPLE_TO_INT(false, false):
+ return "begin write in background";
+ case WRITE_STATE_TUPLE_TO_INT(false, true):
+ return "begin write in current thread";
+ case WRITE_STATE_TUPLE_TO_INT(true, false):
+ return "begin partial write in background";
+ case WRITE_STATE_TUPLE_TO_INT(true, true):
+ return "begin partial write in current thread";
+ }
+ GPR_UNREACHABLE_CODE(return "bad state tuple");
+}
+
+static void write_action_begin_locked(void* gt, grpc_error* error_ignored) {
+ GPR_TIMER_SCOPE("write_action_begin_locked", 0);
+ grpc_chttp2_transport* t = static_cast<grpc_chttp2_transport*>(gt);
+ GPR_ASSERT(t->write_state != GRPC_CHTTP2_WRITE_STATE_IDLE);
+ grpc_chttp2_begin_write_result r;
+ if (t->closed_with_error != GRPC_ERROR_NONE) {
+ r.writing = false;
+ } else {
+ r = grpc_chttp2_begin_write(t);
+ }
+ if (r.writing) {
+ if (r.partial) {
+ GRPC_STATS_INC_HTTP2_PARTIAL_WRITES();
+ }
+ if (!t->is_first_write_in_batch) {
+ GRPC_STATS_INC_HTTP2_WRITES_CONTINUED();
+ }
+ grpc_closure_scheduler* scheduler =
+ write_scheduler(t, r.early_results_scheduled, r.partial);
+ if (scheduler != grpc_schedule_on_exec_ctx) {
+ GRPC_STATS_INC_HTTP2_WRITES_OFFLOADED();
+ }
+ set_write_state(
+ t,
+ r.partial ? GRPC_CHTTP2_WRITE_STATE_WRITING_WITH_MORE
+ : GRPC_CHTTP2_WRITE_STATE_WRITING,
+ begin_writing_desc(r.partial, scheduler == grpc_schedule_on_exec_ctx));
+ GRPC_CLOSURE_SCHED(
+ GRPC_CLOSURE_INIT(&t->write_action, write_action, t, scheduler),
+ GRPC_ERROR_NONE);
+ if (t->reading_paused_on_pending_induced_frames) {
+ GPR_ASSERT(t->num_pending_induced_frames == 0);
+ /* We had paused reading, because we had many induced frames (SETTINGS
+ * ACK, PINGS ACK and RST_STREAMS) pending in t->qbuf. Now that we have
+ * been able to flush qbuf, we can resume reading. */
+ GRPC_CHTTP2_IF_TRACING(gpr_log(
+ GPR_INFO,
+ "transport %p : Resuming reading after being paused due to too "
+ "many unwritten SETTINGS ACK, PINGS ACK and RST_STREAM frames",
+ t));
+ t->reading_paused_on_pending_induced_frames = false;
+ continue_read_action_locked(t);
+ }
+ } else {
+ GRPC_STATS_INC_HTTP2_SPURIOUS_WRITES_BEGUN();
+ set_write_state(t, GRPC_CHTTP2_WRITE_STATE_IDLE, "begin writing nothing");
+ GRPC_CHTTP2_UNREF_TRANSPORT(t, "writing");
+ }
+}
+
+static void write_action(void* gt, grpc_error* error) {
+ GPR_TIMER_SCOPE("write_action", 0);
+ grpc_chttp2_transport* t = static_cast<grpc_chttp2_transport*>(gt);
+ void* cl = t->cl;
+ t->cl = nullptr;
+ grpc_endpoint_write(
+ t->ep, &t->outbuf,
+ GRPC_CLOSURE_INIT(&t->write_action_end_locked, write_action_end_locked, t,
+ grpc_combiner_scheduler(t->combiner)),
+ cl);
+}
+
+/* Callback from the grpc_endpoint after bytes have been written by calling
+ * sendmsg */
+static void write_action_end_locked(void* tp, grpc_error* error) {
+ GPR_TIMER_SCOPE("terminate_writing_with_lock", 0);
+ grpc_chttp2_transport* t = static_cast<grpc_chttp2_transport*>(tp);
+
+ bool closed = false;
+ if (error != GRPC_ERROR_NONE) {
+ close_transport_locked(t, GRPC_ERROR_REF(error));
+ closed = true;
+ }
+
+ if (t->sent_goaway_state == GRPC_CHTTP2_GOAWAY_SEND_SCHEDULED) {
+ t->sent_goaway_state = GRPC_CHTTP2_GOAWAY_SENT;
+ closed = true;
+ if (grpc_chttp2_stream_map_size(&t->stream_map) == 0) {
+ close_transport_locked(
+ t, GRPC_ERROR_CREATE_FROM_STATIC_STRING("goaway sent"));
+ }
+ }
+
+ switch (t->write_state) {
+ case GRPC_CHTTP2_WRITE_STATE_IDLE:
+ GPR_UNREACHABLE_CODE(break);
+ case GRPC_CHTTP2_WRITE_STATE_WRITING:
+ GPR_TIMER_MARK("state=writing", 0);
+ set_write_state(t, GRPC_CHTTP2_WRITE_STATE_IDLE, "finish writing");
+ break;
+ case GRPC_CHTTP2_WRITE_STATE_WRITING_WITH_MORE:
+ GPR_TIMER_MARK("state=writing_stale_no_poller", 0);
+ set_write_state(t, GRPC_CHTTP2_WRITE_STATE_WRITING, "continue writing");
+ t->is_first_write_in_batch = false;
+ GRPC_CHTTP2_REF_TRANSPORT(t, "writing");
+ // If the transport is closed, we will retry writing on the endpoint
+ // and next write may contain part of the currently serialized frames.
+ // So, we should only call the run_after_write callbacks when the next
+ // write finishes, or the callbacks will be invoked when the stream is
+ // closed.
+ if (!closed) {
+ GRPC_CLOSURE_LIST_SCHED(&t->run_after_write);
+ }
+ GRPC_CLOSURE_RUN(
+ GRPC_CLOSURE_INIT(&t->write_action_begin_locked,
+ write_action_begin_locked, t,
+ grpc_combiner_finally_scheduler(t->combiner)),
+ GRPC_ERROR_NONE);
+ break;
+ }
+
+ grpc_chttp2_end_write(t, GRPC_ERROR_REF(error));
+ GRPC_CHTTP2_UNREF_TRANSPORT(t, "writing");
+}
+
+// Dirties an HTTP2 setting to be sent out next time a writing path occurs.
+// If the change needs to occur immediately, manually initiate a write.
+static void queue_setting_update(grpc_chttp2_transport* t,
+ grpc_chttp2_setting_id id, uint32_t value) {
+ const grpc_chttp2_setting_parameters* sp =
+ &grpc_chttp2_settings_parameters[id];
+ uint32_t use_value = GPR_CLAMP(value, sp->min_value, sp->max_value);
+ if (use_value != value) {
+ gpr_log(GPR_INFO, "Requested parameter %s clamped from %d to %d", sp->name,
+ value, use_value);
+ }
+ if (use_value != t->settings[GRPC_LOCAL_SETTINGS][id]) {
+ t->settings[GRPC_LOCAL_SETTINGS][id] = use_value;
+ t->dirtied_local_settings = 1;
+ }
+}
+
+void grpc_chttp2_add_incoming_goaway(grpc_chttp2_transport* t,
+ uint32_t goaway_error,
+ uint32_t last_stream_id,
+ const grpc_slice& goaway_text) {
+ // Discard the error from a previous goaway frame (if any)
+ if (t->goaway_error != GRPC_ERROR_NONE) {
+ GRPC_ERROR_UNREF(t->goaway_error);
+ }
+ t->goaway_error = grpc_error_set_str(
+ grpc_error_set_int(
+ grpc_error_set_int(
+ GRPC_ERROR_CREATE_FROM_STATIC_STRING("GOAWAY received"),
+ GRPC_ERROR_INT_HTTP2_ERROR, static_cast<intptr_t>(goaway_error)),
+ GRPC_ERROR_INT_GRPC_STATUS, GRPC_STATUS_UNAVAILABLE),
+ GRPC_ERROR_STR_RAW_BYTES, goaway_text);
+
+ GRPC_CHTTP2_IF_TRACING(
+ gpr_log(GPR_INFO, "transport %p got goaway with last stream id %d", t,
+ last_stream_id));
+ /* We want to log this irrespective of whether http tracing is enabled if we
+ * received a GOAWAY with a non NO_ERROR code. */
+ if (goaway_error != GRPC_HTTP2_NO_ERROR) {
+ gpr_log(GPR_INFO, "%s: Got goaway [%d] err=%s", t->peer_string,
+ goaway_error, grpc_error_string(t->goaway_error));
+ }
+ /* When a client receives a GOAWAY with error code ENHANCE_YOUR_CALM and debug
+ * data equal to "too_many_pings", it should log the occurrence at a log level
+ * that is enabled by default and double the configured KEEPALIVE_TIME used
+ * for new connections on that channel. */
+ if (GPR_UNLIKELY(t->is_client &&
+ goaway_error == GRPC_HTTP2_ENHANCE_YOUR_CALM &&
+ grpc_slice_str_cmp(goaway_text, "too_many_pings") == 0)) {
+ gpr_log(GPR_ERROR,
+ "Received a GOAWAY with error code ENHANCE_YOUR_CALM and debug "
+ "data equal to \"too_many_pings\"");
+ double current_keepalive_time_ms = static_cast<double>(t->keepalive_time);
+ t->keepalive_time =
+ current_keepalive_time_ms > INT_MAX / KEEPALIVE_TIME_BACKOFF_MULTIPLIER
+ ? GRPC_MILLIS_INF_FUTURE
+ : static_cast<grpc_millis>(current_keepalive_time_ms *
+ KEEPALIVE_TIME_BACKOFF_MULTIPLIER);
+ }
+
+ /* lie: use transient failure from the transport to indicate goaway has been
+ * received */
+ connectivity_state_set(t, GRPC_CHANNEL_TRANSIENT_FAILURE, "got_goaway");
+}
+
+static void maybe_start_some_streams(grpc_chttp2_transport* t) {
+ grpc_chttp2_stream* s;
+ /* cancel out streams that haven't yet started if we have received a GOAWAY */
+ if (t->goaway_error != GRPC_ERROR_NONE) {
+ while (grpc_chttp2_list_pop_waiting_for_concurrency(t, &s)) {
+ grpc_chttp2_cancel_stream(
+ t, s,
+ grpc_error_set_int(
+ GRPC_ERROR_CREATE_FROM_STATIC_STRING("GOAWAY received"),
+ GRPC_ERROR_INT_GRPC_STATUS, GRPC_STATUS_UNAVAILABLE));
+ }
+ return;
+ }
+ /* start streams where we have free grpc_chttp2_stream ids and free
+ * concurrency */
+ while (t->next_stream_id <= MAX_CLIENT_STREAM_ID &&
+ grpc_chttp2_stream_map_size(&t->stream_map) <
+ t->settings[GRPC_PEER_SETTINGS]
+ [GRPC_CHTTP2_SETTINGS_MAX_CONCURRENT_STREAMS] &&
+ grpc_chttp2_list_pop_waiting_for_concurrency(t, &s)) {
+ /* safe since we can't (legally) be parsing this stream yet */
+ GRPC_CHTTP2_IF_TRACING(gpr_log(
+ GPR_INFO,
+ "HTTP:%s: Transport %p allocating new grpc_chttp2_stream %p to id %d",
+ t->is_client ? "CLI" : "SVR", t, s, t->next_stream_id));
+
+ GPR_ASSERT(s->id == 0);
+ s->id = t->next_stream_id;
+ t->next_stream_id += 2;
+
+ if (t->next_stream_id >= MAX_CLIENT_STREAM_ID) {
+ connectivity_state_set(t, GRPC_CHANNEL_TRANSIENT_FAILURE,
+ "no_more_stream_ids");
+ }
+
+ grpc_chttp2_stream_map_add(&t->stream_map, s->id, s);
+ post_destructive_reclaimer(t);
+ grpc_chttp2_mark_stream_writable(t, s);
+ grpc_chttp2_initiate_write(t, GRPC_CHTTP2_INITIATE_WRITE_START_NEW_STREAM);
+ }
+ /* cancel out streams that will never be started */
+ if (t->next_stream_id >= MAX_CLIENT_STREAM_ID) {
+ while (grpc_chttp2_list_pop_waiting_for_concurrency(t, &s)) {
+ grpc_chttp2_cancel_stream(
+ t, s,
+ grpc_error_set_int(
+ GRPC_ERROR_CREATE_FROM_STATIC_STRING("Stream IDs exhausted"),
+ GRPC_ERROR_INT_GRPC_STATUS, GRPC_STATUS_UNAVAILABLE));
+ }
+ }
+}
+
+/* Flag that this closure barrier may be covering a write in a pollset, and so
+ we should not complete this closure until we can prove that the write got
+ scheduled */
+#define CLOSURE_BARRIER_MAY_COVER_WRITE (1 << 0)
+/* First bit of the reference count, stored in the high order bits (with the low
+ bits being used for flags defined above) */
+#define CLOSURE_BARRIER_FIRST_REF_BIT (1 << 16)
+
+static grpc_closure* add_closure_barrier(grpc_closure* closure) {
+ closure->next_data.scratch += CLOSURE_BARRIER_FIRST_REF_BIT;
+ return closure;
+}
+
+static void null_then_sched_closure(grpc_closure** closure) {
+ grpc_closure* c = *closure;
+ *closure = nullptr;
+ GRPC_CLOSURE_SCHED(c, GRPC_ERROR_NONE);
+}
+
+void grpc_chttp2_complete_closure_step(grpc_chttp2_transport* t,
+ grpc_chttp2_stream* s,
+ grpc_closure** pclosure,
+ grpc_error* error, const char* desc) {
+ grpc_closure* closure = *pclosure;
+ *pclosure = nullptr;
+ if (closure == nullptr) {
+ GRPC_ERROR_UNREF(error);
+ return;
+ }
+ closure->next_data.scratch -= CLOSURE_BARRIER_FIRST_REF_BIT;
+ if (GRPC_TRACE_FLAG_ENABLED(grpc_http_trace)) {
+ const char* errstr = grpc_error_string(error);
+ gpr_log(
+ GPR_INFO,
+ "complete_closure_step: t=%p %p refs=%d flags=0x%04x desc=%s err=%s "
+ "write_state=%s",
+ t, closure,
+ static_cast<int>(closure->next_data.scratch /
+ CLOSURE_BARRIER_FIRST_REF_BIT),
+ static_cast<int>(closure->next_data.scratch %
+ CLOSURE_BARRIER_FIRST_REF_BIT),
+ desc, errstr, write_state_name(t->write_state));
+ }
+ if (error != GRPC_ERROR_NONE) {
+ if (closure->error_data.error == GRPC_ERROR_NONE) {
+ closure->error_data.error = GRPC_ERROR_CREATE_FROM_STATIC_STRING(
+ "Error in HTTP transport completing operation");
+ closure->error_data.error = grpc_error_set_str(
+ closure->error_data.error, GRPC_ERROR_STR_TARGET_ADDRESS,
+ grpc_slice_from_copied_string(t->peer_string));
+ }
+ closure->error_data.error =
+ grpc_error_add_child(closure->error_data.error, error);
+ }
+ if (closure->next_data.scratch < CLOSURE_BARRIER_FIRST_REF_BIT) {
+ if ((t->write_state == GRPC_CHTTP2_WRITE_STATE_IDLE) ||
+ !(closure->next_data.scratch & CLOSURE_BARRIER_MAY_COVER_WRITE)) {
+ // Using GRPC_CLOSURE_SCHED instead of GRPC_CLOSURE_RUN to avoid running
+ // closures earlier than when it is safe to do so.
+ GRPC_CLOSURE_SCHED(closure, closure->error_data.error);
+ } else {
+ grpc_closure_list_append(&t->run_after_write, closure,
+ closure->error_data.error);
+ }
+ }
+}
+
+static bool contains_non_ok_status(grpc_metadata_batch* batch) {
+ if (batch->idx.named.grpc_status != nullptr) {
+ return !grpc_mdelem_static_value_eq(batch->idx.named.grpc_status->md,
+ GRPC_MDELEM_GRPC_STATUS_0);
+ }
+ return false;
+}
+
+static void maybe_become_writable_due_to_send_msg(grpc_chttp2_transport* t,
+ grpc_chttp2_stream* s) {
+ if (s->id != 0 && (!s->write_buffering ||
+ s->flow_controlled_buffer.length > t->write_buffer_size)) {
+ grpc_chttp2_mark_stream_writable(t, s);
+ grpc_chttp2_initiate_write(t, GRPC_CHTTP2_INITIATE_WRITE_SEND_MESSAGE);
+ }
+}
+
+static void add_fetched_slice_locked(grpc_chttp2_transport* t,
+ grpc_chttp2_stream* s) {
+ s->fetched_send_message_length +=
+ static_cast<uint32_t> GRPC_SLICE_LENGTH(s->fetching_slice);
+ grpc_slice_buffer_add(&s->flow_controlled_buffer, s->fetching_slice);
+ maybe_become_writable_due_to_send_msg(t, s);
+}
+
+static void continue_fetching_send_locked(grpc_chttp2_transport* t,
+ grpc_chttp2_stream* s) {
+ for (;;) {
+ if (s->fetching_send_message == nullptr) {
+ /* Stream was cancelled before message fetch completed */
+ abort(); /* TODO(ctiller): what cleanup here? */
+ return; /* early out */
+ }
+ if (s->fetched_send_message_length == s->fetching_send_message->length()) {
+ int64_t notify_offset = s->next_message_end_offset;
+ if (notify_offset <= s->flow_controlled_bytes_written) {
+ grpc_chttp2_complete_closure_step(
+ t, s, &s->fetching_send_message_finished, GRPC_ERROR_NONE,
+ "fetching_send_message_finished");
+ } else {
+ grpc_chttp2_write_cb* cb = t->write_cb_pool;
+ if (cb == nullptr) {
+ cb = static_cast<grpc_chttp2_write_cb*>(gpr_malloc(sizeof(*cb)));
+ } else {
+ t->write_cb_pool = cb->next;
+ }
+ cb->call_at_byte = notify_offset;
+ cb->closure = s->fetching_send_message_finished;
+ s->fetching_send_message_finished = nullptr;
+ grpc_chttp2_write_cb** list =
+ s->fetching_send_message->flags() & GRPC_WRITE_THROUGH
+ ? &s->on_write_finished_cbs
+ : &s->on_flow_controlled_cbs;
+ cb->next = *list;
+ *list = cb;
+ }
+ s->fetching_send_message.reset();
+ return; /* early out */
+ } else if (s->fetching_send_message->Next(UINT32_MAX,
+ &s->complete_fetch_locked)) {
+ grpc_error* error = s->fetching_send_message->Pull(&s->fetching_slice);
+ if (error != GRPC_ERROR_NONE) {
+ s->fetching_send_message.reset();
+ grpc_chttp2_cancel_stream(t, s, error);
+ } else {
+ add_fetched_slice_locked(t, s);
+ }
+ }
+ }
+}
+
+static void complete_fetch_locked(void* gs, grpc_error* error) {
+ grpc_chttp2_stream* s = static_cast<grpc_chttp2_stream*>(gs);
+ grpc_chttp2_transport* t = s->t;
+ if (error == GRPC_ERROR_NONE) {
+ error = s->fetching_send_message->Pull(&s->fetching_slice);
+ if (error == GRPC_ERROR_NONE) {
+ add_fetched_slice_locked(t, s);
+ continue_fetching_send_locked(t, s);
+ }
+ }
+ if (error != GRPC_ERROR_NONE) {
+ s->fetching_send_message.reset();
+ grpc_chttp2_cancel_stream(t, s, error);
+ }
+}
+
+static void log_metadata(const grpc_metadata_batch* md_batch, uint32_t id,
+ bool is_client, bool is_initial) {
+ for (grpc_linked_mdelem* md = md_batch->list.head; md != nullptr;
+ md = md->next) {
+ char* key = grpc_slice_to_c_string(GRPC_MDKEY(md->md));
+ char* value = grpc_slice_to_c_string(GRPC_MDVALUE(md->md));
+ gpr_log(GPR_INFO, "HTTP:%d:%s:%s: %s: %s", id, is_initial ? "HDR" : "TRL",
+ is_client ? "CLI" : "SVR", key, value);
+ gpr_free(key);
+ gpr_free(value);
+ }
+}
+
+static void perform_stream_op_locked(void* stream_op,
+ grpc_error* error_ignored) {
+ GPR_TIMER_SCOPE("perform_stream_op_locked", 0);
+
+ grpc_transport_stream_op_batch* op =
+ static_cast<grpc_transport_stream_op_batch*>(stream_op);
+ grpc_chttp2_stream* s =
+ static_cast<grpc_chttp2_stream*>(op->handler_private.extra_arg);
+ grpc_transport_stream_op_batch_payload* op_payload = op->payload;
+ grpc_chttp2_transport* t = s->t;
+
+ GRPC_STATS_INC_HTTP2_OP_BATCHES();
+
+ s->context = op->payload->context;
+ s->traced = op->is_traced;
+ if (GRPC_TRACE_FLAG_ENABLED(grpc_http_trace)) {
+ char* str = grpc_transport_stream_op_batch_string(op);
+ gpr_log(GPR_INFO, "perform_stream_op_locked: %s; on_complete = %p", str,
+ op->on_complete);
+ gpr_free(str);
+ if (op->send_initial_metadata) {
+ log_metadata(op_payload->send_initial_metadata.send_initial_metadata,
+ s->id, t->is_client, true);
+ }
+ if (op->send_trailing_metadata) {
+ log_metadata(op_payload->send_trailing_metadata.send_trailing_metadata,
+ s->id, t->is_client, false);
+ }
+ }
+
+ grpc_closure* on_complete = op->on_complete;
+ // on_complete will be null if and only if there are no send ops in the batch.
+ if (on_complete != nullptr) {
+ // This batch has send ops. Use final_data as a barrier until enqueue time;
+ // the initial counter is dropped at the end of this function.
+ on_complete->next_data.scratch = CLOSURE_BARRIER_FIRST_REF_BIT;
+ on_complete->error_data.error = GRPC_ERROR_NONE;
+ }
+
+ if (op->cancel_stream) {
+ GRPC_STATS_INC_HTTP2_OP_CANCEL();
+ grpc_chttp2_cancel_stream(t, s, op_payload->cancel_stream.cancel_error);
+ }
+
+ if (op->send_initial_metadata) {
+ if (t->is_client && t->channelz_socket != nullptr) {
+ t->channelz_socket->RecordStreamStartedFromLocal();
+ }
+ GRPC_STATS_INC_HTTP2_OP_SEND_INITIAL_METADATA();
+ GPR_ASSERT(s->send_initial_metadata_finished == nullptr);
+ on_complete->next_data.scratch |= CLOSURE_BARRIER_MAY_COVER_WRITE;
+
+ /* Identify stream compression */
+ if (op_payload->send_initial_metadata.send_initial_metadata->idx.named
+ .content_encoding == nullptr ||
+ grpc_stream_compression_method_parse(
+ GRPC_MDVALUE(
+ op_payload->send_initial_metadata.send_initial_metadata->idx
+ .named.content_encoding->md),
+ true, &s->stream_compression_method) == 0) {
+ s->stream_compression_method = GRPC_STREAM_COMPRESSION_IDENTITY_COMPRESS;
+ }
+ if (s->stream_compression_method !=
+ GRPC_STREAM_COMPRESSION_IDENTITY_COMPRESS) {
+ s->uncompressed_data_size = 0;
+ s->stream_compression_ctx = nullptr;
+ grpc_slice_buffer_init(&s->compressed_data_buffer);
+ }
+ s->send_initial_metadata_finished = add_closure_barrier(on_complete);
+ s->send_initial_metadata =
+ op_payload->send_initial_metadata.send_initial_metadata;
+ const size_t metadata_size =
+ grpc_metadata_batch_size(s->send_initial_metadata);
+ const size_t metadata_peer_limit =
+ t->settings[GRPC_PEER_SETTINGS]
+ [GRPC_CHTTP2_SETTINGS_MAX_HEADER_LIST_SIZE];
+ if (t->is_client) {
+ s->deadline = GPR_MIN(s->deadline, s->send_initial_metadata->deadline);
+ }
+ if (metadata_size > metadata_peer_limit) {
+ grpc_chttp2_cancel_stream(
+ t, s,
+ grpc_error_set_int(
+ grpc_error_set_int(
+ grpc_error_set_int(GRPC_ERROR_CREATE_FROM_STATIC_STRING(
+ "to-be-sent initial metadata size "
+ "exceeds peer limit"),
+ GRPC_ERROR_INT_SIZE,
+ static_cast<intptr_t>(metadata_size)),
+ GRPC_ERROR_INT_LIMIT,
+ static_cast<intptr_t>(metadata_peer_limit)),
+ GRPC_ERROR_INT_GRPC_STATUS, GRPC_STATUS_RESOURCE_EXHAUSTED));
+ } else {
+ if (contains_non_ok_status(s->send_initial_metadata)) {
+ s->seen_error = true;
+ }
+ if (!s->write_closed) {
+ if (t->is_client) {
+ if (t->closed_with_error == GRPC_ERROR_NONE) {
+ GPR_ASSERT(s->id == 0);
+ grpc_chttp2_list_add_waiting_for_concurrency(t, s);
+ maybe_start_some_streams(t);
+ } else {
+ grpc_chttp2_cancel_stream(
+ t, s,
+ grpc_error_set_int(
+ GRPC_ERROR_CREATE_REFERENCING_FROM_STATIC_STRING(
+ "Transport closed", &t->closed_with_error, 1),
+ GRPC_ERROR_INT_GRPC_STATUS, GRPC_STATUS_UNAVAILABLE));
+ }
+ } else {
+ GPR_ASSERT(s->id != 0);
+ grpc_chttp2_mark_stream_writable(t, s);
+ if (!(op->send_message &&
+ (op->payload->send_message.send_message->flags() &
+ GRPC_WRITE_BUFFER_HINT))) {
+ grpc_chttp2_initiate_write(
+ t, GRPC_CHTTP2_INITIATE_WRITE_SEND_INITIAL_METADATA);
+ }
+ }
+ } else {
+ s->send_initial_metadata = nullptr;
+ grpc_chttp2_complete_closure_step(
+ t, s, &s->send_initial_metadata_finished,
+ GRPC_ERROR_CREATE_REFERENCING_FROM_STATIC_STRING(
+ "Attempt to send initial metadata after stream was closed",
+ &s->write_closed_error, 1),
+ "send_initial_metadata_finished");
+ }
+ }
+ if (op_payload->send_initial_metadata.peer_string != nullptr) {
+ gpr_atm_rel_store(op_payload->send_initial_metadata.peer_string,
+ (gpr_atm)t->peer_string);
+ }
+ }
+
+ if (op->send_message) {
+ GRPC_STATS_INC_HTTP2_OP_SEND_MESSAGE();
+ t->num_messages_in_next_write++;
+ GRPC_STATS_INC_HTTP2_SEND_MESSAGE_SIZE(
+ op->payload->send_message.send_message->length());
+ on_complete->next_data.scratch |= CLOSURE_BARRIER_MAY_COVER_WRITE;
+ s->fetching_send_message_finished = add_closure_barrier(op->on_complete);
+ if (s->write_closed) {
+ op->payload->send_message.stream_write_closed = true;
+ // We should NOT return an error here, so as to avoid a cancel OP being
+ // started. The surface layer will notice that the stream has been closed
+ // for writes and fail the send message op.
+ op->payload->send_message.send_message.reset();
+ grpc_chttp2_complete_closure_step(
+ t, s, &s->fetching_send_message_finished, GRPC_ERROR_NONE,
+ "fetching_send_message_finished");
+ } else {
+ GPR_ASSERT(s->fetching_send_message == nullptr);
+ uint8_t* frame_hdr = grpc_slice_buffer_tiny_add(
+ &s->flow_controlled_buffer, GRPC_HEADER_SIZE_IN_BYTES);
+ uint32_t flags = op_payload->send_message.send_message->flags();
+ frame_hdr[0] = (flags & GRPC_WRITE_INTERNAL_COMPRESS) != 0;
+ size_t len = op_payload->send_message.send_message->length();
+ frame_hdr[1] = static_cast<uint8_t>(len >> 24);
+ frame_hdr[2] = static_cast<uint8_t>(len >> 16);
+ frame_hdr[3] = static_cast<uint8_t>(len >> 8);
+ frame_hdr[4] = static_cast<uint8_t>(len);
+ s->fetching_send_message =
+ std::move(op_payload->send_message.send_message);
+ s->fetched_send_message_length = 0;
+ s->next_message_end_offset =
+ s->flow_controlled_bytes_written +
+ static_cast<int64_t>(s->flow_controlled_buffer.length) +
+ static_cast<int64_t>(len);
+ if (flags & GRPC_WRITE_BUFFER_HINT) {
+ s->next_message_end_offset -= t->write_buffer_size;
+ s->write_buffering = true;
+ } else {
+ s->write_buffering = false;
+ }
+ continue_fetching_send_locked(t, s);
+ maybe_become_writable_due_to_send_msg(t, s);
+ }
+ }
+
+ if (op->send_trailing_metadata) {
+ GRPC_STATS_INC_HTTP2_OP_SEND_TRAILING_METADATA();
+ GPR_ASSERT(s->send_trailing_metadata_finished == nullptr);
+ on_complete->next_data.scratch |= CLOSURE_BARRIER_MAY_COVER_WRITE;
+ s->send_trailing_metadata_finished = add_closure_barrier(on_complete);
+ s->send_trailing_metadata =
+ op_payload->send_trailing_metadata.send_trailing_metadata;
+ s->write_buffering = false;
+ const size_t metadata_size =
+ grpc_metadata_batch_size(s->send_trailing_metadata);
+ const size_t metadata_peer_limit =
+ t->settings[GRPC_PEER_SETTINGS]
+ [GRPC_CHTTP2_SETTINGS_MAX_HEADER_LIST_SIZE];
+ if (metadata_size > metadata_peer_limit) {
+ grpc_chttp2_cancel_stream(
+ t, s,
+ grpc_error_set_int(
+ grpc_error_set_int(
+ grpc_error_set_int(GRPC_ERROR_CREATE_FROM_STATIC_STRING(
+ "to-be-sent trailing metadata size "
+ "exceeds peer limit"),
+ GRPC_ERROR_INT_SIZE,
+ static_cast<intptr_t>(metadata_size)),
+ GRPC_ERROR_INT_LIMIT,
+ static_cast<intptr_t>(metadata_peer_limit)),
+ GRPC_ERROR_INT_GRPC_STATUS, GRPC_STATUS_RESOURCE_EXHAUSTED));
+ } else {
+ if (contains_non_ok_status(s->send_trailing_metadata)) {
+ s->seen_error = true;
+ }
+ if (s->write_closed) {
+ s->send_trailing_metadata = nullptr;
+ grpc_chttp2_complete_closure_step(
+ t, s, &s->send_trailing_metadata_finished,
+ grpc_metadata_batch_is_empty(
+ op->payload->send_trailing_metadata.send_trailing_metadata)
+ ? GRPC_ERROR_NONE
+ : GRPC_ERROR_CREATE_FROM_STATIC_STRING(
+ "Attempt to send trailing metadata after "
+ "stream was closed"),
+ "send_trailing_metadata_finished");
+ } else if (s->id != 0) {
+ /* TODO(ctiller): check if there's flow control for any outstanding
+ bytes before going writable */
+ grpc_chttp2_mark_stream_writable(t, s);
+ grpc_chttp2_initiate_write(
+ t, GRPC_CHTTP2_INITIATE_WRITE_SEND_TRAILING_METADATA);
+ }
+ }
+ }
+
+ if (op->recv_initial_metadata) {
+ GRPC_STATS_INC_HTTP2_OP_RECV_INITIAL_METADATA();
+ GPR_ASSERT(s->recv_initial_metadata_ready == nullptr);
+ s->recv_initial_metadata_ready =
+ op_payload->recv_initial_metadata.recv_initial_metadata_ready;
+ s->recv_initial_metadata =
+ op_payload->recv_initial_metadata.recv_initial_metadata;
+ s->trailing_metadata_available =
+ op_payload->recv_initial_metadata.trailing_metadata_available;
+ if (op_payload->recv_initial_metadata.peer_string != nullptr) {
+ gpr_atm_rel_store(op_payload->recv_initial_metadata.peer_string,
+ (gpr_atm)t->peer_string);
+ }
+ grpc_chttp2_maybe_complete_recv_initial_metadata(t, s);
+ }
+
+ if (op->recv_message) {
+ GRPC_STATS_INC_HTTP2_OP_RECV_MESSAGE();
+ size_t before = 0;
+ GPR_ASSERT(s->recv_message_ready == nullptr);
+ GPR_ASSERT(!s->pending_byte_stream);
+ s->recv_message_ready = op_payload->recv_message.recv_message_ready;
+ s->recv_message = op_payload->recv_message.recv_message;
+ if (s->id != 0) {
+ if (!s->read_closed) {
+ before = s->frame_storage.length +
+ s->unprocessed_incoming_frames_buffer.length;
+ }
+ }
+ grpc_chttp2_maybe_complete_recv_message(t, s);
+ if (s->id != 0) {
+ if (!s->read_closed && s->frame_storage.length == 0) {
+ size_t after = s->frame_storage.length +
+ s->unprocessed_incoming_frames_buffer_cached_length;
+ s->flow_control->IncomingByteStreamUpdate(GRPC_HEADER_SIZE_IN_BYTES,
+ before - after);
+ grpc_chttp2_act_on_flowctl_action(s->flow_control->MakeAction(), t, s);
+ }
+ }
+ }
+
+ if (op->recv_trailing_metadata) {
+ GRPC_STATS_INC_HTTP2_OP_RECV_TRAILING_METADATA();
+ GPR_ASSERT(s->collecting_stats == nullptr);
+ s->collecting_stats = op_payload->recv_trailing_metadata.collect_stats;
+ GPR_ASSERT(s->recv_trailing_metadata_finished == nullptr);
+ s->recv_trailing_metadata_finished =
+ op_payload->recv_trailing_metadata.recv_trailing_metadata_ready;
+ s->recv_trailing_metadata =
+ op_payload->recv_trailing_metadata.recv_trailing_metadata;
+ s->final_metadata_requested = true;
+ grpc_chttp2_maybe_complete_recv_trailing_metadata(t, s);
+ }
+
+ if (on_complete != nullptr) {
+ grpc_chttp2_complete_closure_step(t, s, &on_complete, GRPC_ERROR_NONE,
+ "op->on_complete");
+ }
+
+ GRPC_CHTTP2_STREAM_UNREF(s, "perform_stream_op");
+}
+
+static void perform_stream_op(grpc_transport* gt, grpc_stream* gs,
+ grpc_transport_stream_op_batch* op) {
+ GPR_TIMER_SCOPE("perform_stream_op", 0);
+ grpc_chttp2_transport* t = reinterpret_cast<grpc_chttp2_transport*>(gt);
+ grpc_chttp2_stream* s = reinterpret_cast<grpc_chttp2_stream*>(gs);
+
+ if (!t->is_client) {
+ if (op->send_initial_metadata) {
+ grpc_millis deadline =
+ op->payload->send_initial_metadata.send_initial_metadata->deadline;
+ GPR_ASSERT(deadline == GRPC_MILLIS_INF_FUTURE);
+ }
+ if (op->send_trailing_metadata) {
+ grpc_millis deadline =
+ op->payload->send_trailing_metadata.send_trailing_metadata->deadline;
+ GPR_ASSERT(deadline == GRPC_MILLIS_INF_FUTURE);
+ }
+ }
+
+ if (GRPC_TRACE_FLAG_ENABLED(grpc_http_trace)) {
+ char* str = grpc_transport_stream_op_batch_string(op);
+ gpr_log(GPR_INFO, "perform_stream_op[s=%p]: %s", s, str);
+ gpr_free(str);
+ }
+
+ GRPC_CHTTP2_STREAM_REF(s, "perform_stream_op");
+ op->handler_private.extra_arg = gs;
+ GRPC_CLOSURE_SCHED(
+ GRPC_CLOSURE_INIT(&op->handler_private.closure, perform_stream_op_locked,
+ op, grpc_combiner_scheduler(t->combiner)),
+ GRPC_ERROR_NONE);
+}
+
+static void cancel_pings(grpc_chttp2_transport* t, grpc_error* error) {
+ /* callback remaining pings: they're not allowed to call into the transport,
+ and maybe they hold resources that need to be freed */
+ grpc_chttp2_ping_queue* pq = &t->ping_queue;
+ GPR_ASSERT(error != GRPC_ERROR_NONE);
+ for (size_t j = 0; j < GRPC_CHTTP2_PCL_COUNT; j++) {
+ grpc_closure_list_fail_all(&pq->lists[j], GRPC_ERROR_REF(error));
+ GRPC_CLOSURE_LIST_SCHED(&pq->lists[j]);
+ }
+ GRPC_ERROR_UNREF(error);
+}
+
+static void send_ping_locked(grpc_chttp2_transport* t,
+ grpc_closure* on_initiate, grpc_closure* on_ack) {
+ if (t->closed_with_error != GRPC_ERROR_NONE) {
+ GRPC_CLOSURE_SCHED(on_initiate, GRPC_ERROR_REF(t->closed_with_error));
+ GRPC_CLOSURE_SCHED(on_ack, GRPC_ERROR_REF(t->closed_with_error));
+ return;
+ }
+ grpc_chttp2_ping_queue* pq = &t->ping_queue;
+ grpc_closure_list_append(&pq->lists[GRPC_CHTTP2_PCL_INITIATE], on_initiate,
+ GRPC_ERROR_NONE);
+ grpc_closure_list_append(&pq->lists[GRPC_CHTTP2_PCL_NEXT], on_ack,
+ GRPC_ERROR_NONE);
+}
+
+/*
+ * Specialized form of send_ping_locked for keepalive ping. If there is already
+ * a ping in progress, the keepalive ping would piggyback onto that ping,
+ * instead of waiting for that ping to complete and then starting a new ping.
+ */
+static void send_keepalive_ping_locked(grpc_chttp2_transport* t) {
+ if (t->closed_with_error != GRPC_ERROR_NONE) {
+ GRPC_CLOSURE_RUN(&t->start_keepalive_ping_locked,
+ GRPC_ERROR_REF(t->closed_with_error));
+ GRPC_CLOSURE_RUN(&t->finish_keepalive_ping_locked,
+ GRPC_ERROR_REF(t->closed_with_error));
+ return;
+ }
+ grpc_chttp2_ping_queue* pq = &t->ping_queue;
+ if (!grpc_closure_list_empty(pq->lists[GRPC_CHTTP2_PCL_INFLIGHT])) {
+ /* There is a ping in flight. Add yourself to the inflight closure list. */
+ GRPC_CLOSURE_RUN(&t->start_keepalive_ping_locked, GRPC_ERROR_NONE);
+ grpc_closure_list_append(&pq->lists[GRPC_CHTTP2_PCL_INFLIGHT],
+ &t->finish_keepalive_ping_locked, GRPC_ERROR_NONE);
+ return;
+ }
+ grpc_closure_list_append(&pq->lists[GRPC_CHTTP2_PCL_INITIATE],
+ &t->start_keepalive_ping_locked, GRPC_ERROR_NONE);
+ grpc_closure_list_append(&pq->lists[GRPC_CHTTP2_PCL_NEXT],
+ &t->finish_keepalive_ping_locked, GRPC_ERROR_NONE);
+}
+
+static void retry_initiate_ping_locked(void* tp, grpc_error* error) {
+ grpc_chttp2_transport* t = static_cast<grpc_chttp2_transport*>(tp);
+ t->ping_state.is_delayed_ping_timer_set = false;
+ if (error == GRPC_ERROR_NONE) {
+ grpc_chttp2_initiate_write(t, GRPC_CHTTP2_INITIATE_WRITE_RETRY_SEND_PING);
+ }
+ GRPC_CHTTP2_UNREF_TRANSPORT(t, "retry_initiate_ping_locked");
+}
+
+void grpc_chttp2_ack_ping(grpc_chttp2_transport* t, uint64_t id) {
+ grpc_chttp2_ping_queue* pq = &t->ping_queue;
+ if (pq->inflight_id != id) {
+ char* from = grpc_endpoint_get_peer(t->ep);
+ gpr_log(GPR_DEBUG, "Unknown ping response from %s: %" PRIx64, from, id);
+ gpr_free(from);
+ return;
+ }
+ GRPC_CLOSURE_LIST_SCHED(&pq->lists[GRPC_CHTTP2_PCL_INFLIGHT]);
+ if (!grpc_closure_list_empty(pq->lists[GRPC_CHTTP2_PCL_NEXT])) {
+ grpc_chttp2_initiate_write(t, GRPC_CHTTP2_INITIATE_WRITE_CONTINUE_PINGS);
+ }
+}
+
+static void send_goaway(grpc_chttp2_transport* t, grpc_error* error) {
+ /* We want to log this irrespective of whether http tracing is enabled */
+ gpr_log(GPR_INFO, "%s: Sending goaway err=%s", t->peer_string,
+ grpc_error_string(error));
+ t->sent_goaway_state = GRPC_CHTTP2_GOAWAY_SEND_SCHEDULED;
+ grpc_http2_error_code http_error;
+ grpc_slice slice;
+ grpc_error_get_status(error, GRPC_MILLIS_INF_FUTURE, nullptr, &slice,
+ &http_error, nullptr);
+ grpc_chttp2_goaway_append(t->last_new_stream_id,
+ static_cast<uint32_t>(http_error),
+ grpc_slice_ref_internal(slice), &t->qbuf);
+ grpc_chttp2_initiate_write(t, GRPC_CHTTP2_INITIATE_WRITE_GOAWAY_SENT);
+ GRPC_ERROR_UNREF(error);
+}
+
+void grpc_chttp2_add_ping_strike(grpc_chttp2_transport* t) {
+ if (++t->ping_recv_state.ping_strikes > t->ping_policy.max_ping_strikes &&
+ t->ping_policy.max_ping_strikes != 0) {
+ send_goaway(t,
+ grpc_error_set_int(
+ GRPC_ERROR_CREATE_FROM_STATIC_STRING("too_many_pings"),
+ GRPC_ERROR_INT_HTTP2_ERROR, GRPC_HTTP2_ENHANCE_YOUR_CALM));
+ /*The transport will be closed after the write is done */
+ close_transport_locked(
+ t, grpc_error_set_int(
+ GRPC_ERROR_CREATE_FROM_STATIC_STRING("Too many pings"),
+ GRPC_ERROR_INT_GRPC_STATUS, GRPC_STATUS_UNAVAILABLE));
+ }
+}
+
+static void perform_transport_op_locked(void* stream_op,
+ grpc_error* error_ignored) {
+ grpc_transport_op* op = static_cast<grpc_transport_op*>(stream_op);
+ grpc_chttp2_transport* t =
+ static_cast<grpc_chttp2_transport*>(op->handler_private.extra_arg);
+
+ if (op->goaway_error) {
+ send_goaway(t, op->goaway_error);
+ }
+
+ if (op->set_accept_stream) {
+ t->channel_callback.accept_stream = op->set_accept_stream_fn;
+ t->channel_callback.accept_stream_user_data =
+ op->set_accept_stream_user_data;
+ }
+
+ if (op->bind_pollset) {
+ grpc_endpoint_add_to_pollset(t->ep, op->bind_pollset);
+ }
+
+ if (op->bind_pollset_set) {
+ grpc_endpoint_add_to_pollset_set(t->ep, op->bind_pollset_set);
+ }
+
+ if (op->send_ping.on_initiate != nullptr || op->send_ping.on_ack != nullptr) {
+ send_ping_locked(t, op->send_ping.on_initiate, op->send_ping.on_ack);
+ grpc_chttp2_initiate_write(t, GRPC_CHTTP2_INITIATE_WRITE_APPLICATION_PING);
+ }
+
+ if (op->on_connectivity_state_change != nullptr) {
+ grpc_connectivity_state_notify_on_state_change(
+ &t->channel_callback.state_tracker, op->connectivity_state,
+ op->on_connectivity_state_change);
+ }
+
+ if (op->disconnect_with_error != GRPC_ERROR_NONE) {
+ close_transport_locked(t, op->disconnect_with_error);
+ }
+
+ GRPC_CLOSURE_RUN(op->on_consumed, GRPC_ERROR_NONE);
+
+ GRPC_CHTTP2_UNREF_TRANSPORT(t, "transport_op");
+}
+
+static void perform_transport_op(grpc_transport* gt, grpc_transport_op* op) {
+ grpc_chttp2_transport* t = reinterpret_cast<grpc_chttp2_transport*>(gt);
+ if (GRPC_TRACE_FLAG_ENABLED(grpc_http_trace)) {
+ char* msg = grpc_transport_op_string(op);
+ gpr_log(GPR_INFO, "perform_transport_op[t=%p]: %s", t, msg);
+ gpr_free(msg);
+ }
+ op->handler_private.extra_arg = gt;
+ GRPC_CHTTP2_REF_TRANSPORT(t, "transport_op");
+ GRPC_CLOSURE_SCHED(GRPC_CLOSURE_INIT(&op->handler_private.closure,
+ perform_transport_op_locked, op,
+ grpc_combiner_scheduler(t->combiner)),
+ GRPC_ERROR_NONE);
+}
+
+/*******************************************************************************
+ * INPUT PROCESSING - GENERAL
+ */
+
+void grpc_chttp2_maybe_complete_recv_initial_metadata(grpc_chttp2_transport* t,
+ grpc_chttp2_stream* s) {
+ if (s->recv_initial_metadata_ready != nullptr &&
+ s->published_metadata[0] != GRPC_METADATA_NOT_PUBLISHED) {
+ if (s->seen_error) {
+ grpc_slice_buffer_reset_and_unref_internal(&s->frame_storage);
+ if (!s->pending_byte_stream) {
+ grpc_slice_buffer_reset_and_unref_internal(
+ &s->unprocessed_incoming_frames_buffer);
+ }
+ }
+ grpc_chttp2_incoming_metadata_buffer_publish(&s->metadata_buffer[0],
+ s->recv_initial_metadata);
+ null_then_sched_closure(&s->recv_initial_metadata_ready);
+ }
+}
+
+void grpc_chttp2_maybe_complete_recv_message(grpc_chttp2_transport* t,
+ grpc_chttp2_stream* s) {
+ grpc_error* error = GRPC_ERROR_NONE;
+ if (s->recv_message_ready != nullptr) {
+ *s->recv_message = nullptr;
+ if (s->final_metadata_requested && s->seen_error) {
+ grpc_slice_buffer_reset_and_unref_internal(&s->frame_storage);
+ if (!s->pending_byte_stream) {
+ grpc_slice_buffer_reset_and_unref_internal(
+ &s->unprocessed_incoming_frames_buffer);
+ }
+ }
+ if (!s->pending_byte_stream) {
+ while (s->unprocessed_incoming_frames_buffer.length > 0 ||
+ s->frame_storage.length > 0) {
+ if (s->unprocessed_incoming_frames_buffer.length == 0) {
+ grpc_slice_buffer_swap(&s->unprocessed_incoming_frames_buffer,
+ &s->frame_storage);
+ s->unprocessed_incoming_frames_decompressed = false;
+ }
+ if (!s->unprocessed_incoming_frames_decompressed &&
+ s->stream_decompression_method !=
+ GRPC_STREAM_COMPRESSION_IDENTITY_DECOMPRESS) {
+ GPR_ASSERT(s->decompressed_data_buffer.length == 0);
+ bool end_of_context;
+ if (!s->stream_decompression_ctx) {
+ s->stream_decompression_ctx =
+ grpc_stream_compression_context_create(
+ s->stream_decompression_method);
+ }
+ if (!grpc_stream_decompress(
+ s->stream_decompression_ctx,
+ &s->unprocessed_incoming_frames_buffer,
+ &s->decompressed_data_buffer, nullptr,
+ GRPC_HEADER_SIZE_IN_BYTES - s->decompressed_header_bytes,
+ &end_of_context)) {
+ grpc_slice_buffer_reset_and_unref_internal(&s->frame_storage);
+ grpc_slice_buffer_reset_and_unref_internal(
+ &s->unprocessed_incoming_frames_buffer);
+ error = GRPC_ERROR_CREATE_FROM_STATIC_STRING(
+ "Stream decompression error.");
+ } else {
+ s->decompressed_header_bytes += s->decompressed_data_buffer.length;
+ if (s->decompressed_header_bytes == GRPC_HEADER_SIZE_IN_BYTES) {
+ s->decompressed_header_bytes = 0;
+ }
+ error = grpc_deframe_unprocessed_incoming_frames(
+ &s->data_parser, s, &s->decompressed_data_buffer, nullptr,
+ s->recv_message);
+ if (end_of_context) {
+ grpc_stream_compression_context_destroy(
+ s->stream_decompression_ctx);
+ s->stream_decompression_ctx = nullptr;
+ }
+ }
+ } else {
+ error = grpc_deframe_unprocessed_incoming_frames(
+ &s->data_parser, s, &s->unprocessed_incoming_frames_buffer,
+ nullptr, s->recv_message);
+ }
+ if (error != GRPC_ERROR_NONE) {
+ s->seen_error = true;
+ grpc_slice_buffer_reset_and_unref_internal(&s->frame_storage);
+ grpc_slice_buffer_reset_and_unref_internal(
+ &s->unprocessed_incoming_frames_buffer);
+ break;
+ } else if (*s->recv_message != nullptr) {
+ break;
+ }
+ }
+ }
+ // save the length of the buffer before handing control back to application
+ // threads. Needed to support correct flow control bookkeeping
+ s->unprocessed_incoming_frames_buffer_cached_length =
+ s->unprocessed_incoming_frames_buffer.length;
+ if (error == GRPC_ERROR_NONE && *s->recv_message != nullptr) {
+ null_then_sched_closure(&s->recv_message_ready);
+ } else if (s->published_metadata[1] != GRPC_METADATA_NOT_PUBLISHED) {
+ *s->recv_message = nullptr;
+ null_then_sched_closure(&s->recv_message_ready);
+ }
+ GRPC_ERROR_UNREF(error);
+ }
+}
+
+void grpc_chttp2_maybe_complete_recv_trailing_metadata(grpc_chttp2_transport* t,
+ grpc_chttp2_stream* s) {
+ grpc_chttp2_maybe_complete_recv_message(t, s);
+ if (s->recv_trailing_metadata_finished != nullptr && s->read_closed &&
+ s->write_closed) {
+ if (s->seen_error || !t->is_client) {
+ grpc_slice_buffer_reset_and_unref_internal(&s->frame_storage);
+ if (!s->pending_byte_stream) {
+ grpc_slice_buffer_reset_and_unref_internal(
+ &s->unprocessed_incoming_frames_buffer);
+ }
+ }
+ bool pending_data = s->pending_byte_stream ||
+ s->unprocessed_incoming_frames_buffer.length > 0;
+ if (s->read_closed && s->frame_storage.length > 0 && !pending_data &&
+ !s->seen_error && s->recv_trailing_metadata_finished != nullptr) {
+ /* Maybe some SYNC_FLUSH data is left in frame_storage. Consume them and
+ * maybe decompress the next 5 bytes in the stream. */
+ if (s->stream_decompression_method ==
+ GRPC_STREAM_COMPRESSION_IDENTITY_DECOMPRESS) {
+ grpc_slice_buffer_move_first(
+ &s->frame_storage,
+ GPR_MIN(s->frame_storage.length, GRPC_HEADER_SIZE_IN_BYTES),
+ &s->unprocessed_incoming_frames_buffer);
+ if (s->unprocessed_incoming_frames_buffer.length > 0) {
+ s->unprocessed_incoming_frames_decompressed = true;
+ pending_data = true;
+ }
+ } else {
+ bool end_of_context;
+ if (!s->stream_decompression_ctx) {
+ s->stream_decompression_ctx = grpc_stream_compression_context_create(
+ s->stream_decompression_method);
+ }
+ if (!grpc_stream_decompress(
+ s->stream_decompression_ctx, &s->frame_storage,
+ &s->unprocessed_incoming_frames_buffer, nullptr,
+ GRPC_HEADER_SIZE_IN_BYTES, &end_of_context)) {
+ grpc_slice_buffer_reset_and_unref_internal(&s->frame_storage);
+ grpc_slice_buffer_reset_and_unref_internal(
+ &s->unprocessed_incoming_frames_buffer);
+ s->seen_error = true;
+ } else {
+ if (s->unprocessed_incoming_frames_buffer.length > 0) {
+ s->unprocessed_incoming_frames_decompressed = true;
+ pending_data = true;
+ }
+ if (end_of_context) {
+ grpc_stream_compression_context_destroy(
+ s->stream_decompression_ctx);
+ s->stream_decompression_ctx = nullptr;
+ }
+ }
+ }
+ }
+ if (s->read_closed && s->frame_storage.length == 0 && !pending_data &&
+ s->recv_trailing_metadata_finished != nullptr) {
+ grpc_transport_move_stats(&s->stats, s->collecting_stats);
+ s->collecting_stats = nullptr;
+ grpc_chttp2_incoming_metadata_buffer_publish(&s->metadata_buffer[1],
+ s->recv_trailing_metadata);
+ null_then_sched_closure(&s->recv_trailing_metadata_finished);
+ }
+ }
+}
+
+static void remove_stream(grpc_chttp2_transport* t, uint32_t id,
+ grpc_error* error) {
+ grpc_chttp2_stream* s = static_cast<grpc_chttp2_stream*>(
+ grpc_chttp2_stream_map_delete(&t->stream_map, id));
+ GPR_DEBUG_ASSERT(s);
+ if (t->incoming_stream == s) {
+ t->incoming_stream = nullptr;
+ grpc_chttp2_parsing_become_skip_parser(t);
+ }
+ if (s->pending_byte_stream) {
+ if (s->on_next != nullptr) {
+ grpc_core::Chttp2IncomingByteStream* bs = s->data_parser.parsing_frame;
+ if (error == GRPC_ERROR_NONE) {
+ error = GRPC_ERROR_CREATE_FROM_STATIC_STRING("Truncated message");
+ }
+ bs->PublishError(error);
+ bs->Unref();
+ s->data_parser.parsing_frame = nullptr;
+ } else {
+ GRPC_ERROR_UNREF(s->byte_stream_error);
+ s->byte_stream_error = GRPC_ERROR_REF(error);
+ }
+ }
+
+ if (grpc_chttp2_stream_map_size(&t->stream_map) == 0) {
+ post_benign_reclaimer(t);
+ if (t->sent_goaway_state == GRPC_CHTTP2_GOAWAY_SENT) {
+ close_transport_locked(
+ t, GRPC_ERROR_CREATE_REFERENCING_FROM_STATIC_STRING(
+ "Last stream closed after sending GOAWAY", &error, 1));
+ }
+ }
+ if (grpc_chttp2_list_remove_writable_stream(t, s)) {
+ GRPC_CHTTP2_STREAM_UNREF(s, "chttp2_writing:remove_stream");
+ }
+
+ GRPC_ERROR_UNREF(error);
+
+ maybe_start_some_streams(t);
+}
+
+void grpc_chttp2_cancel_stream(grpc_chttp2_transport* t, grpc_chttp2_stream* s,
+ grpc_error* due_to_error) {
+ if (!t->is_client && !s->sent_trailing_metadata &&
+ grpc_error_has_clear_grpc_status(due_to_error)) {
+ close_from_api(t, s, due_to_error);
+ return;
+ }
+
+ if (!s->read_closed || !s->write_closed) {
+ if (s->id != 0) {
+ grpc_http2_error_code http_error;
+ grpc_error_get_status(due_to_error, s->deadline, nullptr, nullptr,
+ &http_error, nullptr);
+ grpc_chttp2_add_rst_stream_to_next_write(
+ t, s->id, static_cast<uint32_t>(http_error), &s->stats.outgoing);
+ grpc_chttp2_initiate_write(t, GRPC_CHTTP2_INITIATE_WRITE_RST_STREAM);
+ }
+ }
+ if (due_to_error != GRPC_ERROR_NONE && !s->seen_error) {
+ s->seen_error = true;
+ }
+ grpc_chttp2_mark_stream_closed(t, s, 1, 1, due_to_error);
+}
+
+void grpc_chttp2_fake_status(grpc_chttp2_transport* t, grpc_chttp2_stream* s,
+ grpc_error* error) {
+ grpc_status_code status;
+ grpc_slice slice;
+ grpc_error_get_status(error, s->deadline, &status, &slice, nullptr, nullptr);
+ if (status != GRPC_STATUS_OK) {
+ s->seen_error = true;
+ }
+ /* stream_global->recv_trailing_metadata_finished gives us a
+ last chance replacement: we've received trailing metadata,
+ but something more important has become available to signal
+ to the upper layers - drop what we've got, and then publish
+ what we want - which is safe because we haven't told anyone
+ about the metadata yet */
+ if (s->published_metadata[1] == GRPC_METADATA_NOT_PUBLISHED ||
+ s->recv_trailing_metadata_finished != nullptr) {
+ char status_string[GPR_LTOA_MIN_BUFSIZE];
+ gpr_ltoa(status, status_string);
+ GRPC_LOG_IF_ERROR("add_status",
+ grpc_chttp2_incoming_metadata_buffer_replace_or_add(
+ &s->metadata_buffer[1],
+ grpc_mdelem_from_slices(
+ GRPC_MDSTR_GRPC_STATUS,
+ grpc_core::UnmanagedMemorySlice(status_string))));
+ if (!GRPC_SLICE_IS_EMPTY(slice)) {
+ GRPC_LOG_IF_ERROR(
+ "add_status_message",
+ grpc_chttp2_incoming_metadata_buffer_replace_or_add(
+ &s->metadata_buffer[1],
+ grpc_mdelem_create(GRPC_MDSTR_GRPC_MESSAGE, slice, nullptr)));
+ }
+ s->published_metadata[1] = GRPC_METADATA_SYNTHESIZED_FROM_FAKE;
+ grpc_chttp2_maybe_complete_recv_trailing_metadata(t, s);
+ }
+
+ GRPC_ERROR_UNREF(error);
+}
+
+static void add_error(grpc_error* error, grpc_error** refs, size_t* nrefs) {
+ if (error == GRPC_ERROR_NONE) return;
+ for (size_t i = 0; i < *nrefs; i++) {
+ if (error == refs[i]) {
+ return;
+ }
+ }
+ refs[*nrefs] = error;
+ ++*nrefs;
+}
+
+static grpc_error* removal_error(grpc_error* extra_error, grpc_chttp2_stream* s,
+ const char* master_error_msg) {
+ grpc_error* refs[3];
+ size_t nrefs = 0;
+ add_error(s->read_closed_error, refs, &nrefs);
+ add_error(s->write_closed_error, refs, &nrefs);
+ add_error(extra_error, refs, &nrefs);
+ grpc_error* error = GRPC_ERROR_NONE;
+ if (nrefs > 0) {
+ error = GRPC_ERROR_CREATE_REFERENCING_FROM_STATIC_STRING(master_error_msg,
+ refs, nrefs);
+ }
+ GRPC_ERROR_UNREF(extra_error);
+ return error;
+}
+
+static void flush_write_list(grpc_chttp2_transport* t, grpc_chttp2_stream* s,
+ grpc_chttp2_write_cb** list, grpc_error* error) {
+ while (*list) {
+ grpc_chttp2_write_cb* cb = *list;
+ *list = cb->next;
+ grpc_chttp2_complete_closure_step(t, s, &cb->closure, GRPC_ERROR_REF(error),
+ "on_write_finished_cb");
+ cb->next = t->write_cb_pool;
+ t->write_cb_pool = cb;
+ }
+ GRPC_ERROR_UNREF(error);
+}
+
+void grpc_chttp2_fail_pending_writes(grpc_chttp2_transport* t,
+ grpc_chttp2_stream* s, grpc_error* error) {
+ error =
+ removal_error(error, s, "Pending writes failed due to stream closure");
+ s->send_initial_metadata = nullptr;
+ grpc_chttp2_complete_closure_step(t, s, &s->send_initial_metadata_finished,
+ GRPC_ERROR_REF(error),
+ "send_initial_metadata_finished");
+
+ s->send_trailing_metadata = nullptr;
+ grpc_chttp2_complete_closure_step(t, s, &s->send_trailing_metadata_finished,
+ GRPC_ERROR_REF(error),
+ "send_trailing_metadata_finished");
+
+ s->fetching_send_message.reset();
+ grpc_chttp2_complete_closure_step(t, s, &s->fetching_send_message_finished,
+ GRPC_ERROR_REF(error),
+ "fetching_send_message_finished");
+ flush_write_list(t, s, &s->on_write_finished_cbs, GRPC_ERROR_REF(error));
+ flush_write_list(t, s, &s->on_flow_controlled_cbs, error);
+}
+
+void grpc_chttp2_mark_stream_closed(grpc_chttp2_transport* t,
+ grpc_chttp2_stream* s, int close_reads,
+ int close_writes, grpc_error* error) {
+ if (s->read_closed && s->write_closed) {
+ /* already closed */
+ grpc_chttp2_maybe_complete_recv_trailing_metadata(t, s);
+ GRPC_ERROR_UNREF(error);
+ return;
+ }
+ bool closed_read = false;
+ bool became_closed = false;
+ if (close_reads && !s->read_closed) {
+ s->read_closed_error = GRPC_ERROR_REF(error);
+ s->read_closed = true;
+ closed_read = true;
+ }
+ if (close_writes && !s->write_closed) {
+ s->write_closed_error = GRPC_ERROR_REF(error);
+ s->write_closed = true;
+ grpc_chttp2_fail_pending_writes(t, s, GRPC_ERROR_REF(error));
+ }
+ if (s->read_closed && s->write_closed) {
+ became_closed = true;
+ grpc_error* overall_error =
+ removal_error(GRPC_ERROR_REF(error), s, "Stream removed");
+ if (s->id != 0) {
+ remove_stream(t, s->id, GRPC_ERROR_REF(overall_error));
+ } else {
+ /* Purge streams waiting on concurrency still waiting for id assignment */
+ grpc_chttp2_list_remove_waiting_for_concurrency(t, s);
+ }
+ if (overall_error != GRPC_ERROR_NONE) {
+ grpc_chttp2_fake_status(t, s, overall_error);
+ }
+ }
+ if (closed_read) {
+ for (int i = 0; i < 2; i++) {
+ if (s->published_metadata[i] == GRPC_METADATA_NOT_PUBLISHED) {
+ s->published_metadata[i] = GRPC_METADATA_PUBLISHED_AT_CLOSE;
+ }
+ }
+ grpc_chttp2_maybe_complete_recv_initial_metadata(t, s);
+ grpc_chttp2_maybe_complete_recv_message(t, s);
+ }
+ if (became_closed) {
+ grpc_chttp2_maybe_complete_recv_trailing_metadata(t, s);
+ GRPC_CHTTP2_STREAM_UNREF(s, "chttp2");
+ }
+ GRPC_ERROR_UNREF(error);
+}
+
+static void close_from_api(grpc_chttp2_transport* t, grpc_chttp2_stream* s,
+ grpc_error* error) {
+ grpc_slice hdr;
+ grpc_slice status_hdr;
+ grpc_slice http_status_hdr;
+ grpc_slice content_type_hdr;
+ grpc_slice message_pfx;
+ uint8_t* p;
+ uint32_t len = 0;
+ grpc_status_code grpc_status;
+ grpc_slice slice;
+ grpc_error_get_status(error, s->deadline, &grpc_status, &slice, nullptr,
+ nullptr);
+
+ GPR_ASSERT(grpc_status >= 0 && (int)grpc_status < 100);
+
+ /* Hand roll a header block.
+ This is unnecessarily ugly - at some point we should find a more
+ elegant solution.
+ It's complicated by the fact that our send machinery would be dead by
+ the time we got around to sending this, so instead we ignore HPACK
+ compression and just write the uncompressed bytes onto the wire. */
+ if (!s->sent_initial_metadata) {
+ http_status_hdr = GRPC_SLICE_MALLOC(13);
+ p = GRPC_SLICE_START_PTR(http_status_hdr);
+ *p++ = 0x00;
+ *p++ = 7;
+ *p++ = ':';
+ *p++ = 's';
+ *p++ = 't';
+ *p++ = 'a';
+ *p++ = 't';
+ *p++ = 'u';
+ *p++ = 's';
+ *p++ = 3;
+ *p++ = '2';
+ *p++ = '0';
+ *p++ = '0';
+ GPR_ASSERT(p == GRPC_SLICE_END_PTR(http_status_hdr));
+ len += static_cast<uint32_t> GRPC_SLICE_LENGTH(http_status_hdr);
+
+ content_type_hdr = GRPC_SLICE_MALLOC(31);
+ p = GRPC_SLICE_START_PTR(content_type_hdr);
+ *p++ = 0x00;
+ *p++ = 12;
+ *p++ = 'c';
+ *p++ = 'o';
+ *p++ = 'n';
+ *p++ = 't';
+ *p++ = 'e';
+ *p++ = 'n';
+ *p++ = 't';
+ *p++ = '-';
+ *p++ = 't';
+ *p++ = 'y';
+ *p++ = 'p';
+ *p++ = 'e';
+ *p++ = 16;
+ *p++ = 'a';
+ *p++ = 'p';
+ *p++ = 'p';
+ *p++ = 'l';
+ *p++ = 'i';
+ *p++ = 'c';
+ *p++ = 'a';
+ *p++ = 't';
+ *p++ = 'i';
+ *p++ = 'o';
+ *p++ = 'n';
+ *p++ = '/';
+ *p++ = 'g';
+ *p++ = 'r';
+ *p++ = 'p';
+ *p++ = 'c';
+ GPR_ASSERT(p == GRPC_SLICE_END_PTR(content_type_hdr));
+ len += static_cast<uint32_t> GRPC_SLICE_LENGTH(content_type_hdr);
+ }
+
+ status_hdr = GRPC_SLICE_MALLOC(15 + (grpc_status >= 10));
+ p = GRPC_SLICE_START_PTR(status_hdr);
+ *p++ = 0x00; /* literal header, not indexed */
+ *p++ = 11; /* len(grpc-status) */
+ *p++ = 'g';
+ *p++ = 'r';
+ *p++ = 'p';
+ *p++ = 'c';
+ *p++ = '-';
+ *p++ = 's';
+ *p++ = 't';
+ *p++ = 'a';
+ *p++ = 't';
+ *p++ = 'u';
+ *p++ = 's';
+ if (grpc_status < 10) {
+ *p++ = 1;
+ *p++ = static_cast<uint8_t>('0' + grpc_status);
+ } else {
+ *p++ = 2;
+ *p++ = static_cast<uint8_t>('0' + (grpc_status / 10));
+ *p++ = static_cast<uint8_t>('0' + (grpc_status % 10));
+ }
+ GPR_ASSERT(p == GRPC_SLICE_END_PTR(status_hdr));
+ len += static_cast<uint32_t> GRPC_SLICE_LENGTH(status_hdr);
+
+ size_t msg_len = GRPC_SLICE_LENGTH(slice);
+ GPR_ASSERT(msg_len <= UINT32_MAX);
+ uint32_t msg_len_len = GRPC_CHTTP2_VARINT_LENGTH((uint32_t)msg_len, 1);
+ message_pfx = GRPC_SLICE_MALLOC(14 + msg_len_len);
+ p = GRPC_SLICE_START_PTR(message_pfx);
+ *p++ = 0x00; /* literal header, not indexed */
+ *p++ = 12; /* len(grpc-message) */
+ *p++ = 'g';
+ *p++ = 'r';
+ *p++ = 'p';
+ *p++ = 'c';
+ *p++ = '-';
+ *p++ = 'm';
+ *p++ = 'e';
+ *p++ = 's';
+ *p++ = 's';
+ *p++ = 'a';
+ *p++ = 'g';
+ *p++ = 'e';
+ GRPC_CHTTP2_WRITE_VARINT((uint32_t)msg_len, 1, 0, p, (uint32_t)msg_len_len);
+ p += msg_len_len;
+ GPR_ASSERT(p == GRPC_SLICE_END_PTR(message_pfx));
+ len += static_cast<uint32_t> GRPC_SLICE_LENGTH(message_pfx);
+ len += static_cast<uint32_t>(msg_len);
+
+ hdr = GRPC_SLICE_MALLOC(9);
+ p = GRPC_SLICE_START_PTR(hdr);
+ *p++ = static_cast<uint8_t>(len >> 16);
+ *p++ = static_cast<uint8_t>(len >> 8);
+ *p++ = static_cast<uint8_t>(len);
+ *p++ = GRPC_CHTTP2_FRAME_HEADER;
+ *p++ = GRPC_CHTTP2_DATA_FLAG_END_STREAM | GRPC_CHTTP2_DATA_FLAG_END_HEADERS;
+ *p++ = static_cast<uint8_t>(s->id >> 24);
+ *p++ = static_cast<uint8_t>(s->id >> 16);
+ *p++ = static_cast<uint8_t>(s->id >> 8);
+ *p++ = static_cast<uint8_t>(s->id);
+ GPR_ASSERT(p == GRPC_SLICE_END_PTR(hdr));
+
+ grpc_slice_buffer_add(&t->qbuf, hdr);
+ if (!s->sent_initial_metadata) {
+ grpc_slice_buffer_add(&t->qbuf, http_status_hdr);
+ grpc_slice_buffer_add(&t->qbuf, content_type_hdr);
+ }
+ grpc_slice_buffer_add(&t->qbuf, status_hdr);
+ grpc_slice_buffer_add(&t->qbuf, message_pfx);
+ grpc_slice_buffer_add(&t->qbuf, grpc_slice_ref_internal(slice));
+ grpc_chttp2_add_rst_stream_to_next_write(t, s->id, GRPC_HTTP2_NO_ERROR,
+ &s->stats.outgoing);
+
+ grpc_chttp2_mark_stream_closed(t, s, 1, 1, error);
+ grpc_chttp2_initiate_write(t, GRPC_CHTTP2_INITIATE_WRITE_CLOSE_FROM_API);
+}
+
+typedef struct {
+ grpc_error* error;
+ grpc_chttp2_transport* t;
+} cancel_stream_cb_args;
+
+static void cancel_stream_cb(void* user_data, uint32_t key, void* stream) {
+ cancel_stream_cb_args* args = static_cast<cancel_stream_cb_args*>(user_data);
+ grpc_chttp2_stream* s = static_cast<grpc_chttp2_stream*>(stream);
+ grpc_chttp2_cancel_stream(args->t, s, GRPC_ERROR_REF(args->error));
+}
+
+static void end_all_the_calls(grpc_chttp2_transport* t, grpc_error* error) {
+ intptr_t http2_error;
+ // If there is no explicit grpc or HTTP/2 error, set to UNAVAILABLE on server.
+ if (!t->is_client && !grpc_error_has_clear_grpc_status(error) &&
+ !grpc_error_get_int(error, GRPC_ERROR_INT_HTTP2_ERROR, &http2_error)) {
+ error = grpc_error_set_int(error, GRPC_ERROR_INT_GRPC_STATUS,
+ GRPC_STATUS_UNAVAILABLE);
+ }
+ cancel_stream_cb_args args = {error, t};
+ grpc_chttp2_stream_map_for_each(&t->stream_map, cancel_stream_cb, &args);
+ GRPC_ERROR_UNREF(error);
+}
+
+/*******************************************************************************
+ * INPUT PROCESSING - PARSING
+ */
+
+template <class F>
+static void WithUrgency(grpc_chttp2_transport* t,
+ grpc_core::chttp2::FlowControlAction::Urgency urgency,
+ grpc_chttp2_initiate_write_reason reason, F action) {
+ switch (urgency) {
+ case grpc_core::chttp2::FlowControlAction::Urgency::NO_ACTION_NEEDED:
+ break;
+ case grpc_core::chttp2::FlowControlAction::Urgency::UPDATE_IMMEDIATELY:
+ grpc_chttp2_initiate_write(t, reason);
+ // fallthrough
+ case grpc_core::chttp2::FlowControlAction::Urgency::QUEUE_UPDATE:
+ action();
+ break;
+ }
+}
+
+void grpc_chttp2_act_on_flowctl_action(
+ const grpc_core::chttp2::FlowControlAction& action,
+ grpc_chttp2_transport* t, grpc_chttp2_stream* s) {
+ WithUrgency(t, action.send_stream_update(),
+ GRPC_CHTTP2_INITIATE_WRITE_STREAM_FLOW_CONTROL,
+ [t, s]() { grpc_chttp2_mark_stream_writable(t, s); });
+ WithUrgency(t, action.send_transport_update(),
+ GRPC_CHTTP2_INITIATE_WRITE_TRANSPORT_FLOW_CONTROL, []() {});
+ WithUrgency(t, action.send_initial_window_update(),
+ GRPC_CHTTP2_INITIATE_WRITE_SEND_SETTINGS, [t, &action]() {
+ queue_setting_update(t,
+ GRPC_CHTTP2_SETTINGS_INITIAL_WINDOW_SIZE,
+ action.initial_window_size());
+ });
+ WithUrgency(t, action.send_max_frame_size_update(),
+ GRPC_CHTTP2_INITIATE_WRITE_SEND_SETTINGS, [t, &action]() {
+ queue_setting_update(t, GRPC_CHTTP2_SETTINGS_MAX_FRAME_SIZE,
+ action.max_frame_size());
+ });
+}
+
+static grpc_error* try_http_parsing(grpc_chttp2_transport* t) {
+ grpc_http_parser parser;
+ size_t i = 0;
+ grpc_error* error = GRPC_ERROR_NONE;
+ grpc_http_response response;
+
+ grpc_http_parser_init(&parser, GRPC_HTTP_RESPONSE, &response);
+
+ grpc_error* parse_error = GRPC_ERROR_NONE;
+ for (; i < t->read_buffer.count && parse_error == GRPC_ERROR_NONE; i++) {
+ parse_error =
+ grpc_http_parser_parse(&parser, t->read_buffer.slices[i], nullptr);
+ }
+ if (parse_error == GRPC_ERROR_NONE &&
+ (parse_error = grpc_http_parser_eof(&parser)) == GRPC_ERROR_NONE) {
+ error = grpc_error_set_int(
+ grpc_error_set_int(GRPC_ERROR_CREATE_FROM_STATIC_STRING(
+ "Trying to connect an http1.x server"),
+ GRPC_ERROR_INT_HTTP_STATUS, response.status),
+ GRPC_ERROR_INT_GRPC_STATUS, GRPC_STATUS_UNAVAILABLE);
+ }
+ GRPC_ERROR_UNREF(parse_error);
+
+ grpc_http_parser_destroy(&parser);
+ grpc_http_response_destroy(&response);
+ return error;
+}
+
+static void read_action_locked(void* tp, grpc_error* error) {
+ GPR_TIMER_SCOPE("reading_action_locked", 0);
+
+ grpc_chttp2_transport* t = static_cast<grpc_chttp2_transport*>(tp);
+
+ GRPC_ERROR_REF(error);
+
+ grpc_error* err = error;
+ if (err != GRPC_ERROR_NONE) {
+ err = grpc_error_set_int(GRPC_ERROR_CREATE_REFERENCING_FROM_STATIC_STRING(
+ "Endpoint read failed", &err, 1),
+ GRPC_ERROR_INT_OCCURRED_DURING_WRITE,
+ t->write_state);
+ }
+ GPR_SWAP(grpc_error*, err, error);
+ GRPC_ERROR_UNREF(err);
+ if (t->closed_with_error == GRPC_ERROR_NONE) {
+ GPR_TIMER_SCOPE("reading_action.parse", 0);
+ size_t i = 0;
+ grpc_error* errors[3] = {GRPC_ERROR_REF(error), GRPC_ERROR_NONE,
+ GRPC_ERROR_NONE};
+ for (; i < t->read_buffer.count && errors[1] == GRPC_ERROR_NONE; i++) {
+ grpc_core::BdpEstimator* bdp_est = t->flow_control->bdp_estimator();
+ if (bdp_est) {
+ bdp_est->AddIncomingBytes(
+ static_cast<int64_t> GRPC_SLICE_LENGTH(t->read_buffer.slices[i]));
+ }
+ errors[1] = grpc_chttp2_perform_read(t, t->read_buffer.slices[i]);
+ }
+ if (errors[1] != GRPC_ERROR_NONE) {
+ errors[2] = try_http_parsing(t);
+ GRPC_ERROR_UNREF(error);
+ error = GRPC_ERROR_CREATE_REFERENCING_FROM_STATIC_STRING(
+ "Failed parsing HTTP/2", errors, GPR_ARRAY_SIZE(errors));
+ }
+ for (i = 0; i < GPR_ARRAY_SIZE(errors); i++) {
+ GRPC_ERROR_UNREF(errors[i]);
+ }
+
+ GPR_TIMER_SCOPE("post_parse_locked", 0);
+ if (t->initial_window_update != 0) {
+ if (t->initial_window_update > 0) {
+ grpc_chttp2_stream* s;
+ while (grpc_chttp2_list_pop_stalled_by_stream(t, &s)) {
+ grpc_chttp2_mark_stream_writable(t, s);
+ grpc_chttp2_initiate_write(
+ t, GRPC_CHTTP2_INITIATE_WRITE_FLOW_CONTROL_UNSTALLED_BY_SETTING);
+ }
+ }
+ t->initial_window_update = 0;
+ }
+ }
+
+ GPR_TIMER_SCOPE("post_reading_action_locked", 0);
+ bool keep_reading = false;
+ if (error == GRPC_ERROR_NONE && t->closed_with_error != GRPC_ERROR_NONE) {
+ error = GRPC_ERROR_CREATE_REFERENCING_FROM_STATIC_STRING(
+ "Transport closed", &t->closed_with_error, 1);
+ }
+ if (error != GRPC_ERROR_NONE) {
+ /* If a goaway frame was received, this might be the reason why the read
+ * failed. Add this info to the error */
+ if (t->goaway_error != GRPC_ERROR_NONE) {
+ error = grpc_error_add_child(error, GRPC_ERROR_REF(t->goaway_error));
+ }
+
+ close_transport_locked(t, GRPC_ERROR_REF(error));
+ t->endpoint_reading = 0;
+ } else if (t->closed_with_error == GRPC_ERROR_NONE) {
+ keep_reading = true;
+ /* Since we have read a byte, reset the keepalive timer */
+ if (t->keepalive_state == GRPC_CHTTP2_KEEPALIVE_STATE_WAITING) {
+ grpc_timer_cancel(&t->keepalive_ping_timer);
+ }
+ }
+ grpc_slice_buffer_reset_and_unref_internal(&t->read_buffer);
+
+ if (keep_reading) {
+ if (t->num_pending_induced_frames >= DEFAULT_MAX_PENDING_INDUCED_FRAMES) {
+ t->reading_paused_on_pending_induced_frames = true;
+ GRPC_CHTTP2_IF_TRACING(
+ gpr_log(GPR_INFO,
+ "transport %p : Pausing reading due to too "
+ "many unwritten SETTINGS ACK and RST_STREAM frames",
+ t));
+ } else {
+ continue_read_action_locked(t);
+ }
+ } else {
+ GRPC_CHTTP2_UNREF_TRANSPORT(t, "reading_action");
+ }
+
+ GRPC_ERROR_UNREF(error);
+}
+
+static void continue_read_action_locked(grpc_chttp2_transport* t) {
+ const bool urgent = t->goaway_error != GRPC_ERROR_NONE;
+ grpc_endpoint_read(t->ep, &t->read_buffer, &t->read_action_locked, urgent);
+ grpc_chttp2_act_on_flowctl_action(t->flow_control->MakeAction(), t, nullptr);
+}
+
+// t is reffed prior to calling the first time, and once the callback chain
+// that kicks off finishes, it's unreffed
+static void schedule_bdp_ping_locked(grpc_chttp2_transport* t) {
+ t->flow_control->bdp_estimator()->SchedulePing();
+ send_ping_locked(t, &t->start_bdp_ping_locked, &t->finish_bdp_ping_locked);
+}
+
+static void start_bdp_ping_locked(void* tp, grpc_error* error) {
+ grpc_chttp2_transport* t = static_cast<grpc_chttp2_transport*>(tp);
+ if (GRPC_TRACE_FLAG_ENABLED(grpc_http_trace)) {
+ gpr_log(GPR_INFO, "%s: Start BDP ping err=%s", t->peer_string,
+ grpc_error_string(error));
+ }
+ if (error != GRPC_ERROR_NONE || t->closed_with_error != GRPC_ERROR_NONE) {
+ return;
+ }
+ /* Reset the keepalive ping timer */
+ if (t->keepalive_state == GRPC_CHTTP2_KEEPALIVE_STATE_WAITING) {
+ grpc_timer_cancel(&t->keepalive_ping_timer);
+ }
+ t->flow_control->bdp_estimator()->StartPing();
+}
+
+static void finish_bdp_ping_locked(void* tp, grpc_error* error) {
+ grpc_chttp2_transport* t = static_cast<grpc_chttp2_transport*>(tp);
+ if (GRPC_TRACE_FLAG_ENABLED(grpc_http_trace)) {
+ gpr_log(GPR_INFO, "%s: Complete BDP ping err=%s", t->peer_string,
+ grpc_error_string(error));
+ }
+ if (error != GRPC_ERROR_NONE || t->closed_with_error != GRPC_ERROR_NONE) {
+ GRPC_CHTTP2_UNREF_TRANSPORT(t, "bdp_ping");
+ return;
+ }
+ grpc_millis next_ping = t->flow_control->bdp_estimator()->CompletePing();
+ grpc_chttp2_act_on_flowctl_action(t->flow_control->PeriodicUpdate(), t,
+ nullptr);
+ GPR_ASSERT(!t->have_next_bdp_ping_timer);
+ t->have_next_bdp_ping_timer = true;
+ grpc_timer_init(&t->next_bdp_ping_timer, next_ping,
+ &t->next_bdp_ping_timer_expired_locked);
+}
+
+static void next_bdp_ping_timer_expired_locked(void* tp, grpc_error* error) {
+ grpc_chttp2_transport* t = static_cast<grpc_chttp2_transport*>(tp);
+ GPR_ASSERT(t->have_next_bdp_ping_timer);
+ t->have_next_bdp_ping_timer = false;
+ if (error != GRPC_ERROR_NONE) {
+ GRPC_CHTTP2_UNREF_TRANSPORT(t, "bdp_ping");
+ return;
+ }
+ schedule_bdp_ping_locked(t);
+}
+
+void grpc_chttp2_config_default_keepalive_args(grpc_channel_args* args,
+ bool is_client) {
+ size_t i;
+ if (args) {
+ for (i = 0; i < args->num_args; i++) {
+ if (0 == strcmp(args->args[i].key, GRPC_ARG_KEEPALIVE_TIME_MS)) {
+ const int value = grpc_channel_arg_get_integer(
+ &args->args[i], {is_client ? g_default_client_keepalive_time_ms
+ : g_default_server_keepalive_time_ms,
+ 1, INT_MAX});
+ if (is_client) {
+ g_default_client_keepalive_time_ms = value;
+ } else {
+ g_default_server_keepalive_time_ms = value;
+ }
+ } else if (0 ==
+ strcmp(args->args[i].key, GRPC_ARG_KEEPALIVE_TIMEOUT_MS)) {
+ const int value = grpc_channel_arg_get_integer(
+ &args->args[i], {is_client ? g_default_client_keepalive_timeout_ms
+ : g_default_server_keepalive_timeout_ms,
+ 0, INT_MAX});
+ if (is_client) {
+ g_default_client_keepalive_timeout_ms = value;
+ } else {
+ g_default_server_keepalive_timeout_ms = value;
+ }
+ } else if (0 == strcmp(args->args[i].key,
+ GRPC_ARG_KEEPALIVE_PERMIT_WITHOUT_CALLS)) {
+ const bool value = static_cast<uint32_t>(grpc_channel_arg_get_integer(
+ &args->args[i],
+ {is_client ? g_default_client_keepalive_permit_without_calls
+ : g_default_server_keepalive_timeout_ms,
+ 0, 1}));
+ if (is_client) {
+ g_default_client_keepalive_permit_without_calls = value;
+ } else {
+ g_default_server_keepalive_permit_without_calls = value;
+ }
+ } else if (0 ==
+ strcmp(args->args[i].key, GRPC_ARG_HTTP2_MAX_PING_STRIKES)) {
+ g_default_max_ping_strikes = grpc_channel_arg_get_integer(
+ &args->args[i], {g_default_max_ping_strikes, 0, INT_MAX});
+ } else if (0 == strcmp(args->args[i].key,
+ GRPC_ARG_HTTP2_MAX_PINGS_WITHOUT_DATA)) {
+ g_default_max_pings_without_data = grpc_channel_arg_get_integer(
+ &args->args[i], {g_default_max_pings_without_data, 0, INT_MAX});
+ } else if (0 ==
+ strcmp(
+ args->args[i].key,
+ GRPC_ARG_HTTP2_MIN_SENT_PING_INTERVAL_WITHOUT_DATA_MS)) {
+ g_default_min_sent_ping_interval_without_data_ms =
+ grpc_channel_arg_get_integer(
+ &args->args[i],
+ {g_default_min_sent_ping_interval_without_data_ms, 0, INT_MAX});
+ } else if (0 ==
+ strcmp(
+ args->args[i].key,
+ GRPC_ARG_HTTP2_MIN_RECV_PING_INTERVAL_WITHOUT_DATA_MS)) {
+ g_default_min_recv_ping_interval_without_data_ms =
+ grpc_channel_arg_get_integer(
+ &args->args[i],
+ {g_default_min_recv_ping_interval_without_data_ms, 0, INT_MAX});
+ }
+ }
+ }
+}
+
+static void init_keepalive_ping_locked(void* arg, grpc_error* error) {
+ grpc_chttp2_transport* t = static_cast<grpc_chttp2_transport*>(arg);
+ GPR_ASSERT(t->keepalive_state == GRPC_CHTTP2_KEEPALIVE_STATE_WAITING);
+ if (t->destroying || t->closed_with_error != GRPC_ERROR_NONE) {
+ t->keepalive_state = GRPC_CHTTP2_KEEPALIVE_STATE_DYING;
+ } else if (error == GRPC_ERROR_NONE) {
+ if (t->keepalive_permit_without_calls ||
+ grpc_chttp2_stream_map_size(&t->stream_map) > 0) {
+ t->keepalive_state = GRPC_CHTTP2_KEEPALIVE_STATE_PINGING;
+ GRPC_CHTTP2_REF_TRANSPORT(t, "keepalive ping end");
+ grpc_timer_init_unset(&t->keepalive_watchdog_timer);
+ send_keepalive_ping_locked(t);
+ grpc_chttp2_initiate_write(t, GRPC_CHTTP2_INITIATE_WRITE_KEEPALIVE_PING);
+ } else {
+ GRPC_CHTTP2_REF_TRANSPORT(t, "init keepalive ping");
+ grpc_timer_init(&t->keepalive_ping_timer,
+ grpc_core::ExecCtx::Get()->Now() + t->keepalive_time,
+ &t->init_keepalive_ping_locked);
+ }
+ } else if (error == GRPC_ERROR_CANCELLED) {
+ /* The keepalive ping timer may be cancelled by bdp */
+ GRPC_CHTTP2_REF_TRANSPORT(t, "init keepalive ping");
+ grpc_timer_init(&t->keepalive_ping_timer,
+ grpc_core::ExecCtx::Get()->Now() + t->keepalive_time,
+ &t->init_keepalive_ping_locked);
+ }
+ GRPC_CHTTP2_UNREF_TRANSPORT(t, "init keepalive ping");
+}
+
+static void start_keepalive_ping_locked(void* arg, grpc_error* error) {
+ grpc_chttp2_transport* t = static_cast<grpc_chttp2_transport*>(arg);
+ if (error != GRPC_ERROR_NONE) {
+ return;
+ }
+ if (t->channelz_socket != nullptr) {
+ t->channelz_socket->RecordKeepaliveSent();
+ }
+ if (GRPC_TRACE_FLAG_ENABLED(grpc_http_trace)) {
+ gpr_log(GPR_INFO, "%s: Start keepalive ping", t->peer_string);
+ }
+ GRPC_CHTTP2_REF_TRANSPORT(t, "keepalive watchdog");
+ grpc_timer_init(&t->keepalive_watchdog_timer,
+ grpc_core::ExecCtx::Get()->Now() + t->keepalive_timeout,
+ &t->keepalive_watchdog_fired_locked);
+}
+
+static void finish_keepalive_ping_locked(void* arg, grpc_error* error) {
+ grpc_chttp2_transport* t = static_cast<grpc_chttp2_transport*>(arg);
+ if (t->keepalive_state == GRPC_CHTTP2_KEEPALIVE_STATE_PINGING) {
+ if (error == GRPC_ERROR_NONE) {
+ if (GRPC_TRACE_FLAG_ENABLED(grpc_http_trace)) {
+ gpr_log(GPR_INFO, "%s: Finish keepalive ping", t->peer_string);
+ }
+ t->keepalive_state = GRPC_CHTTP2_KEEPALIVE_STATE_WAITING;
+ grpc_timer_cancel(&t->keepalive_watchdog_timer);
+ GRPC_CHTTP2_REF_TRANSPORT(t, "init keepalive ping");
+ grpc_timer_init(&t->keepalive_ping_timer,
+ grpc_core::ExecCtx::Get()->Now() + t->keepalive_time,
+ &t->init_keepalive_ping_locked);
+ }
+ }
+ GRPC_CHTTP2_UNREF_TRANSPORT(t, "keepalive ping end");
+}
+
+static void keepalive_watchdog_fired_locked(void* arg, grpc_error* error) {
+ grpc_chttp2_transport* t = static_cast<grpc_chttp2_transport*>(arg);
+ if (t->keepalive_state == GRPC_CHTTP2_KEEPALIVE_STATE_PINGING) {
+ if (error == GRPC_ERROR_NONE) {
+ gpr_log(GPR_ERROR, "%s: Keepalive watchdog fired. Closing transport.",
+ t->peer_string);
+ t->keepalive_state = GRPC_CHTTP2_KEEPALIVE_STATE_DYING;
+ close_transport_locked(
+ t, grpc_error_set_int(GRPC_ERROR_CREATE_FROM_STATIC_STRING(
+ "keepalive watchdog timeout"),
+ GRPC_ERROR_INT_GRPC_STATUS,
+ GRPC_STATUS_UNAVAILABLE));
+ }
+ } else {
+ /* The watchdog timer should have been cancelled by
+ * finish_keepalive_ping_locked. */
+ if (GPR_UNLIKELY(error != GRPC_ERROR_CANCELLED)) {
+ gpr_log(GPR_ERROR, "keepalive_ping_end state error: %d (expect: %d)",
+ t->keepalive_state, GRPC_CHTTP2_KEEPALIVE_STATE_PINGING);
+ }
+ }
+ GRPC_CHTTP2_UNREF_TRANSPORT(t, "keepalive watchdog");
+}
+
+/*******************************************************************************
+ * CALLBACK LOOP
+ */
+
+static void connectivity_state_set(grpc_chttp2_transport* t,
+ grpc_connectivity_state state,
+ const char* reason) {
+ GRPC_CHTTP2_IF_TRACING(
+ gpr_log(GPR_INFO, "transport %p set connectivity_state=%d", t, state));
+ grpc_connectivity_state_set(&t->channel_callback.state_tracker, state,
+ reason);
+}
+
+/*******************************************************************************
+ * POLLSET STUFF
+ */
+
+static void set_pollset(grpc_transport* gt, grpc_stream* gs,
+ grpc_pollset* pollset) {
+ grpc_chttp2_transport* t = reinterpret_cast<grpc_chttp2_transport*>(gt);
+ grpc_endpoint_add_to_pollset(t->ep, pollset);
+}
+
+static void set_pollset_set(grpc_transport* gt, grpc_stream* gs,
+ grpc_pollset_set* pollset_set) {
+ grpc_chttp2_transport* t = reinterpret_cast<grpc_chttp2_transport*>(gt);
+ grpc_endpoint_add_to_pollset_set(t->ep, pollset_set);
+}
+
+/*******************************************************************************
+ * BYTE STREAM
+ */
+
+static void reset_byte_stream(void* arg, grpc_error* error) {
+ grpc_chttp2_stream* s = static_cast<grpc_chttp2_stream*>(arg);
+ s->pending_byte_stream = false;
+ if (error == GRPC_ERROR_NONE) {
+ grpc_chttp2_maybe_complete_recv_message(s->t, s);
+ grpc_chttp2_maybe_complete_recv_trailing_metadata(s->t, s);
+ } else {
+ GPR_ASSERT(error != GRPC_ERROR_NONE);
+ GRPC_CLOSURE_SCHED(s->on_next, GRPC_ERROR_REF(error));
+ s->on_next = nullptr;
+ GRPC_ERROR_UNREF(s->byte_stream_error);
+ s->byte_stream_error = GRPC_ERROR_NONE;
+ grpc_chttp2_cancel_stream(s->t, s, GRPC_ERROR_REF(error));
+ s->byte_stream_error = GRPC_ERROR_REF(error);
+ }
+}
+
+namespace grpc_core {
+
+Chttp2IncomingByteStream::Chttp2IncomingByteStream(
+ grpc_chttp2_transport* transport, grpc_chttp2_stream* stream,
+ uint32_t frame_size, uint32_t flags)
+ : ByteStream(frame_size, flags),
+ transport_(transport),
+ stream_(stream),
+ refs_(2),
+ remaining_bytes_(frame_size) {
+ GRPC_ERROR_UNREF(stream->byte_stream_error);
+ stream->byte_stream_error = GRPC_ERROR_NONE;
+}
+
+void Chttp2IncomingByteStream::OrphanLocked(void* arg,
+ grpc_error* error_ignored) {
+ Chttp2IncomingByteStream* bs = static_cast<Chttp2IncomingByteStream*>(arg);
+ grpc_chttp2_stream* s = bs->stream_;
+ grpc_chttp2_transport* t = s->t;
+ bs->Unref();
+ s->pending_byte_stream = false;
+ grpc_chttp2_maybe_complete_recv_message(t, s);
+ grpc_chttp2_maybe_complete_recv_trailing_metadata(t, s);
+}
+
+void Chttp2IncomingByteStream::Orphan() {
+ GPR_TIMER_SCOPE("incoming_byte_stream_destroy", 0);
+ GRPC_CLOSURE_SCHED(
+ GRPC_CLOSURE_INIT(&destroy_action_,
+ &Chttp2IncomingByteStream::OrphanLocked, this,
+ grpc_combiner_scheduler(transport_->combiner)),
+ GRPC_ERROR_NONE);
+}
+
+void Chttp2IncomingByteStream::NextLocked(void* arg,
+ grpc_error* error_ignored) {
+ Chttp2IncomingByteStream* bs = static_cast<Chttp2IncomingByteStream*>(arg);
+ grpc_chttp2_transport* t = bs->transport_;
+ grpc_chttp2_stream* s = bs->stream_;
+ size_t cur_length = s->frame_storage.length;
+ if (!s->read_closed) {
+ s->flow_control->IncomingByteStreamUpdate(bs->next_action_.max_size_hint,
+ cur_length);
+ grpc_chttp2_act_on_flowctl_action(s->flow_control->MakeAction(), t, s);
+ }
+ GPR_ASSERT(s->unprocessed_incoming_frames_buffer.length == 0);
+ if (s->frame_storage.length > 0) {
+ grpc_slice_buffer_swap(&s->frame_storage,
+ &s->unprocessed_incoming_frames_buffer);
+ s->unprocessed_incoming_frames_decompressed = false;
+ GRPC_CLOSURE_SCHED(bs->next_action_.on_complete, GRPC_ERROR_NONE);
+ } else if (s->byte_stream_error != GRPC_ERROR_NONE) {
+ GRPC_CLOSURE_SCHED(bs->next_action_.on_complete,
+ GRPC_ERROR_REF(s->byte_stream_error));
+ if (s->data_parser.parsing_frame != nullptr) {
+ s->data_parser.parsing_frame->Unref();
+ s->data_parser.parsing_frame = nullptr;
+ }
+ } else if (s->read_closed) {
+ if (bs->remaining_bytes_ != 0) {
+ s->byte_stream_error =
+ GRPC_ERROR_CREATE_FROM_STATIC_STRING("Truncated message");
+ GRPC_CLOSURE_SCHED(bs->next_action_.on_complete,
+ GRPC_ERROR_REF(s->byte_stream_error));
+ if (s->data_parser.parsing_frame != nullptr) {
+ s->data_parser.parsing_frame->Unref();
+ s->data_parser.parsing_frame = nullptr;
+ }
+ } else {
+ /* Should never reach here. */
+ GPR_ASSERT(false);
+ }
+ } else {
+ s->on_next = bs->next_action_.on_complete;
+ }
+ bs->Unref();
+}
+
+bool Chttp2IncomingByteStream::Next(size_t max_size_hint,
+ grpc_closure* on_complete) {
+ GPR_TIMER_SCOPE("incoming_byte_stream_next", 0);
+ if (stream_->unprocessed_incoming_frames_buffer.length > 0) {
+ return true;
+ } else {
+ Ref();
+ next_action_.max_size_hint = max_size_hint;
+ next_action_.on_complete = on_complete;
+ GRPC_CLOSURE_SCHED(
+ GRPC_CLOSURE_INIT(&next_action_.closure,
+ &Chttp2IncomingByteStream::NextLocked, this,
+ grpc_combiner_scheduler(transport_->combiner)),
+ GRPC_ERROR_NONE);
+ return false;
+ }
+}
+
+void Chttp2IncomingByteStream::MaybeCreateStreamDecompressionCtx() {
+ GPR_DEBUG_ASSERT(stream_->stream_decompression_method !=
+ GRPC_STREAM_COMPRESSION_IDENTITY_DECOMPRESS);
+ if (!stream_->stream_decompression_ctx) {
+ stream_->stream_decompression_ctx = grpc_stream_compression_context_create(
+ stream_->stream_decompression_method);
+ }
+}
+
+grpc_error* Chttp2IncomingByteStream::Pull(grpc_slice* slice) {
+ GPR_TIMER_SCOPE("incoming_byte_stream_pull", 0);
+ grpc_error* error;
+ if (stream_->unprocessed_incoming_frames_buffer.length > 0) {
+ if (!stream_->unprocessed_incoming_frames_decompressed &&
+ stream_->stream_decompression_method !=
+ GRPC_STREAM_COMPRESSION_IDENTITY_DECOMPRESS) {
+ bool end_of_context;
+ MaybeCreateStreamDecompressionCtx();
+ if (!grpc_stream_decompress(stream_->stream_decompression_ctx,
+ &stream_->unprocessed_incoming_frames_buffer,
+ &stream_->decompressed_data_buffer, nullptr,
+ MAX_SIZE_T, &end_of_context)) {
+ error =
+ GRPC_ERROR_CREATE_FROM_STATIC_STRING("Stream decompression error.");
+ return error;
+ }
+ GPR_ASSERT(stream_->unprocessed_incoming_frames_buffer.length == 0);
+ grpc_slice_buffer_swap(&stream_->unprocessed_incoming_frames_buffer,
+ &stream_->decompressed_data_buffer);
+ stream_->unprocessed_incoming_frames_decompressed = true;
+ if (end_of_context) {
+ grpc_stream_compression_context_destroy(
+ stream_->stream_decompression_ctx);
+ stream_->stream_decompression_ctx = nullptr;
+ }
+ if (stream_->unprocessed_incoming_frames_buffer.length == 0) {
+ *slice = grpc_empty_slice();
+ }
+ }
+ error = grpc_deframe_unprocessed_incoming_frames(
+ &stream_->data_parser, stream_,
+ &stream_->unprocessed_incoming_frames_buffer, slice, nullptr);
+ if (error != GRPC_ERROR_NONE) {
+ return error;
+ }
+ } else {
+ error = GRPC_ERROR_CREATE_FROM_STATIC_STRING("Truncated message");
+ GRPC_CLOSURE_SCHED(&stream_->reset_byte_stream, GRPC_ERROR_REF(error));
+ return error;
+ }
+ return GRPC_ERROR_NONE;
+}
+
+void Chttp2IncomingByteStream::PublishError(grpc_error* error) {
+ GPR_ASSERT(error != GRPC_ERROR_NONE);
+ GRPC_CLOSURE_SCHED(stream_->on_next, GRPC_ERROR_REF(error));
+ stream_->on_next = nullptr;
+ GRPC_ERROR_UNREF(stream_->byte_stream_error);
+ stream_->byte_stream_error = GRPC_ERROR_REF(error);
+ grpc_chttp2_cancel_stream(transport_, stream_, GRPC_ERROR_REF(error));
+}
+
+grpc_error* Chttp2IncomingByteStream::Push(const grpc_slice& slice,
+ grpc_slice* slice_out) {
+ if (remaining_bytes_ < GRPC_SLICE_LENGTH(slice)) {
+ grpc_error* error =
+ GRPC_ERROR_CREATE_FROM_STATIC_STRING("Too many bytes in stream");
+ GRPC_CLOSURE_SCHED(&stream_->reset_byte_stream, GRPC_ERROR_REF(error));
+ grpc_slice_unref_internal(slice);
+ return error;
+ } else {
+ remaining_bytes_ -= static_cast<uint32_t> GRPC_SLICE_LENGTH(slice);
+ if (slice_out != nullptr) {
+ *slice_out = slice;
+ }
+ return GRPC_ERROR_NONE;
+ }
+}
+
+grpc_error* Chttp2IncomingByteStream::Finished(grpc_error* error,
+ bool reset_on_error) {
+ if (error == GRPC_ERROR_NONE) {
+ if (remaining_bytes_ != 0) {
+ error = GRPC_ERROR_CREATE_FROM_STATIC_STRING("Truncated message");
+ }
+ }
+ if (error != GRPC_ERROR_NONE && reset_on_error) {
+ GRPC_CLOSURE_SCHED(&stream_->reset_byte_stream, GRPC_ERROR_REF(error));
+ }
+ Unref();
+ return error;
+}
+
+void Chttp2IncomingByteStream::Shutdown(grpc_error* error) {
+ GRPC_ERROR_UNREF(Finished(error, true /* reset_on_error */));
+}
+
+} // namespace grpc_core
+
+/*******************************************************************************
+ * RESOURCE QUOTAS
+ */
+
+static void post_benign_reclaimer(grpc_chttp2_transport* t) {
+ if (!t->benign_reclaimer_registered) {
+ t->benign_reclaimer_registered = true;
+ GRPC_CHTTP2_REF_TRANSPORT(t, "benign_reclaimer");
+ grpc_resource_user_post_reclaimer(grpc_endpoint_get_resource_user(t->ep),
+ false, &t->benign_reclaimer_locked);
+ }
+}
+
+static void post_destructive_reclaimer(grpc_chttp2_transport* t) {
+ if (!t->destructive_reclaimer_registered) {
+ t->destructive_reclaimer_registered = true;
+ GRPC_CHTTP2_REF_TRANSPORT(t, "destructive_reclaimer");
+ grpc_resource_user_post_reclaimer(grpc_endpoint_get_resource_user(t->ep),
+ true, &t->destructive_reclaimer_locked);
+ }
+}
+
+static void benign_reclaimer_locked(void* arg, grpc_error* error) {
+ grpc_chttp2_transport* t = static_cast<grpc_chttp2_transport*>(arg);
+ if (error == GRPC_ERROR_NONE &&
+ grpc_chttp2_stream_map_size(&t->stream_map) == 0) {
+ /* Channel with no active streams: send a goaway to try and make it
+ * disconnect cleanly */
+ if (GRPC_TRACE_FLAG_ENABLED(grpc_resource_quota_trace)) {
+ gpr_log(GPR_INFO, "HTTP2: %s - send goaway to free memory",
+ t->peer_string);
+ }
+ send_goaway(t,
+ grpc_error_set_int(
+ GRPC_ERROR_CREATE_FROM_STATIC_STRING("Buffers full"),
+ GRPC_ERROR_INT_HTTP2_ERROR, GRPC_HTTP2_ENHANCE_YOUR_CALM));
+ } else if (error == GRPC_ERROR_NONE &&
+ GRPC_TRACE_FLAG_ENABLED(grpc_resource_quota_trace)) {
+ gpr_log(GPR_INFO,
+ "HTTP2: %s - skip benign reclamation, there are still %" PRIdPTR
+ " streams",
+ t->peer_string, grpc_chttp2_stream_map_size(&t->stream_map));
+ }
+ t->benign_reclaimer_registered = false;
+ if (error != GRPC_ERROR_CANCELLED) {
+ grpc_resource_user_finish_reclamation(
+ grpc_endpoint_get_resource_user(t->ep));
+ }
+ GRPC_CHTTP2_UNREF_TRANSPORT(t, "benign_reclaimer");
+}
+
+static void destructive_reclaimer_locked(void* arg, grpc_error* error) {
+ grpc_chttp2_transport* t = static_cast<grpc_chttp2_transport*>(arg);
+ size_t n = grpc_chttp2_stream_map_size(&t->stream_map);
+ t->destructive_reclaimer_registered = false;
+ if (error == GRPC_ERROR_NONE && n > 0) {
+ grpc_chttp2_stream* s = static_cast<grpc_chttp2_stream*>(
+ grpc_chttp2_stream_map_rand(&t->stream_map));
+ if (GRPC_TRACE_FLAG_ENABLED(grpc_resource_quota_trace)) {
+ gpr_log(GPR_INFO, "HTTP2: %s - abandon stream id %d", t->peer_string,
+ s->id);
+ }
+ grpc_chttp2_cancel_stream(
+ t, s,
+ grpc_error_set_int(GRPC_ERROR_CREATE_FROM_STATIC_STRING("Buffers full"),
+ GRPC_ERROR_INT_HTTP2_ERROR,
+ GRPC_HTTP2_ENHANCE_YOUR_CALM));
+ if (n > 1) {
+ /* Since we cancel one stream per destructive reclamation, if
+ there are more streams left, we can immediately post a new
+ reclaimer in case the resource quota needs to free more
+ memory */
+ post_destructive_reclaimer(t);
+ }
+ }
+ if (error != GRPC_ERROR_CANCELLED) {
+ grpc_resource_user_finish_reclamation(
+ grpc_endpoint_get_resource_user(t->ep));
+ }
+ GRPC_CHTTP2_UNREF_TRANSPORT(t, "destructive_reclaimer");
+}
+
+/*******************************************************************************
+ * MONITORING
+ */
+
+const char* grpc_chttp2_initiate_write_reason_string(
+ grpc_chttp2_initiate_write_reason reason) {
+ switch (reason) {
+ case GRPC_CHTTP2_INITIATE_WRITE_INITIAL_WRITE:
+ return "INITIAL_WRITE";
+ case GRPC_CHTTP2_INITIATE_WRITE_START_NEW_STREAM:
+ return "START_NEW_STREAM";
+ case GRPC_CHTTP2_INITIATE_WRITE_SEND_MESSAGE:
+ return "SEND_MESSAGE";
+ case GRPC_CHTTP2_INITIATE_WRITE_SEND_INITIAL_METADATA:
+ return "SEND_INITIAL_METADATA";
+ case GRPC_CHTTP2_INITIATE_WRITE_SEND_TRAILING_METADATA:
+ return "SEND_TRAILING_METADATA";
+ case GRPC_CHTTP2_INITIATE_WRITE_RETRY_SEND_PING:
+ return "RETRY_SEND_PING";
+ case GRPC_CHTTP2_INITIATE_WRITE_CONTINUE_PINGS:
+ return "CONTINUE_PINGS";
+ case GRPC_CHTTP2_INITIATE_WRITE_GOAWAY_SENT:
+ return "GOAWAY_SENT";
+ case GRPC_CHTTP2_INITIATE_WRITE_RST_STREAM:
+ return "RST_STREAM";
+ case GRPC_CHTTP2_INITIATE_WRITE_CLOSE_FROM_API:
+ return "CLOSE_FROM_API";
+ case GRPC_CHTTP2_INITIATE_WRITE_STREAM_FLOW_CONTROL:
+ return "STREAM_FLOW_CONTROL";
+ case GRPC_CHTTP2_INITIATE_WRITE_TRANSPORT_FLOW_CONTROL:
+ return "TRANSPORT_FLOW_CONTROL";
+ case GRPC_CHTTP2_INITIATE_WRITE_SEND_SETTINGS:
+ return "SEND_SETTINGS";
+ case GRPC_CHTTP2_INITIATE_WRITE_FLOW_CONTROL_UNSTALLED_BY_SETTING:
+ return "FLOW_CONTROL_UNSTALLED_BY_SETTING";
+ case GRPC_CHTTP2_INITIATE_WRITE_FLOW_CONTROL_UNSTALLED_BY_UPDATE:
+ return "FLOW_CONTROL_UNSTALLED_BY_UPDATE";
+ case GRPC_CHTTP2_INITIATE_WRITE_APPLICATION_PING:
+ return "APPLICATION_PING";
+ case GRPC_CHTTP2_INITIATE_WRITE_KEEPALIVE_PING:
+ return "KEEPALIVE_PING";
+ case GRPC_CHTTP2_INITIATE_WRITE_TRANSPORT_FLOW_CONTROL_UNSTALLED:
+ return "TRANSPORT_FLOW_CONTROL_UNSTALLED";
+ case GRPC_CHTTP2_INITIATE_WRITE_PING_RESPONSE:
+ return "PING_RESPONSE";
+ case GRPC_CHTTP2_INITIATE_WRITE_FORCE_RST_STREAM:
+ return "FORCE_RST_STREAM";
+ }
+ GPR_UNREACHABLE_CODE(return "unknown");
+}
+
+static grpc_endpoint* chttp2_get_endpoint(grpc_transport* t) {
+ return (reinterpret_cast<grpc_chttp2_transport*>(t))->ep;
+}
+
+static const grpc_transport_vtable vtable = {sizeof(grpc_chttp2_stream),
+ "chttp2",
+ init_stream,
+ set_pollset,
+ set_pollset_set,
+ perform_stream_op,
+ perform_transport_op,
+ destroy_stream,
+ destroy_transport,
+ chttp2_get_endpoint};
+
+static const grpc_transport_vtable* get_vtable(void) { return &vtable; }
+
+grpc_core::RefCountedPtr<grpc_core::channelz::SocketNode>
+grpc_chttp2_transport_get_socket_node(grpc_transport* transport) {
+ grpc_chttp2_transport* t =
+ reinterpret_cast<grpc_chttp2_transport*>(transport);
+ return t->channelz_socket;
+}
+
+grpc_transport* grpc_create_chttp2_transport(
+ const grpc_channel_args* channel_args, grpc_endpoint* ep, bool is_client,
+ grpc_resource_user* resource_user) {
+ auto t = grpc_core::New<grpc_chttp2_transport>(channel_args, ep, is_client,
+ resource_user);
+ return &t->base;
+}
+
+void grpc_chttp2_transport_start_reading(
+ grpc_transport* transport, grpc_slice_buffer* read_buffer,
+ grpc_closure* notify_on_receive_settings) {
+ grpc_chttp2_transport* t =
+ reinterpret_cast<grpc_chttp2_transport*>(transport);
+ GRPC_CHTTP2_REF_TRANSPORT(
+ t, "reading_action"); /* matches unref inside reading_action */
+ if (read_buffer != nullptr) {
+ grpc_slice_buffer_move_into(read_buffer, &t->read_buffer);
+ gpr_free(read_buffer);
+ }
+ t->notify_on_receive_settings = notify_on_receive_settings;
+ GRPC_CLOSURE_SCHED(&t->read_action_locked, GRPC_ERROR_NONE);
+}