Built motion from commit 6a09e18b.|2.6.11
[motion2.git] / legacy-libs / grpc / deps / grpc / src / core / ext / transport / chttp2 / transport / writing.cc
diff --git a/legacy-libs/grpc/deps/grpc/src/core/ext/transport/chttp2/transport/writing.cc b/legacy-libs/grpc/deps/grpc/src/core/ext/transport/chttp2/transport/writing.cc
new file mode 100644 (file)
index 0000000..d6d9e45
--- /dev/null
@@ -0,0 +1,695 @@
+/*
+ *
+ * Copyright 2015 gRPC authors.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ *
+ */
+
+#include <grpc/support/port_platform.h>
+
+#include "src/core/ext/transport/chttp2/transport/context_list.h"
+#include "src/core/ext/transport/chttp2/transport/internal.h"
+
+#include <limits.h>
+
+#include <grpc/support/log.h>
+
+#include "src/core/lib/compression/stream_compression.h"
+#include "src/core/lib/debug/stats.h"
+#include "src/core/lib/profiling/timers.h"
+#include "src/core/lib/slice/slice_internal.h"
+#include "src/core/lib/transport/http2_errors.h"
+
+static void add_to_write_list(grpc_chttp2_write_cb** list,
+                              grpc_chttp2_write_cb* cb) {
+  cb->next = *list;
+  *list = cb;
+}
+
+static void finish_write_cb(grpc_chttp2_transport* t, grpc_chttp2_stream* s,
+                            grpc_chttp2_write_cb* cb, grpc_error* error) {
+  grpc_chttp2_complete_closure_step(t, s, &cb->closure, error,
+                                    "finish_write_cb");
+  cb->next = t->write_cb_pool;
+  t->write_cb_pool = cb;
+}
+
+static void maybe_initiate_ping(grpc_chttp2_transport* t) {
+  grpc_chttp2_ping_queue* pq = &t->ping_queue;
+  if (grpc_closure_list_empty(pq->lists[GRPC_CHTTP2_PCL_NEXT])) {
+    /* no ping needed: wait */
+    return;
+  }
+  if (!grpc_closure_list_empty(pq->lists[GRPC_CHTTP2_PCL_INFLIGHT])) {
+    /* ping already in-flight: wait */
+    if (GRPC_TRACE_FLAG_ENABLED(grpc_http_trace) ||
+        GRPC_TRACE_FLAG_ENABLED(grpc_bdp_estimator_trace)) {
+      gpr_log(GPR_INFO, "%s: Ping delayed [%p]: already pinging",
+              t->is_client ? "CLIENT" : "SERVER", t->peer_string);
+    }
+    return;
+  }
+  if (t->ping_state.pings_before_data_required == 0 &&
+      t->ping_policy.max_pings_without_data != 0) {
+    /* need to receive something of substance before sending a ping again */
+    if (GRPC_TRACE_FLAG_ENABLED(grpc_http_trace) ||
+        GRPC_TRACE_FLAG_ENABLED(grpc_bdp_estimator_trace)) {
+      gpr_log(GPR_INFO, "%s: Ping delayed [%p]: too many recent pings: %d/%d",
+              t->is_client ? "CLIENT" : "SERVER", t->peer_string,
+              t->ping_state.pings_before_data_required,
+              t->ping_policy.max_pings_without_data);
+    }
+    return;
+  }
+  grpc_millis now = grpc_core::ExecCtx::Get()->Now();
+
+  grpc_millis next_allowed_ping_interval =
+      (t->keepalive_permit_without_calls == 0 &&
+       grpc_chttp2_stream_map_size(&t->stream_map) == 0)
+          ? 7200 * GPR_MS_PER_SEC
+          : t->ping_policy.min_sent_ping_interval_without_data;
+  grpc_millis next_allowed_ping =
+      t->ping_state.last_ping_sent_time + next_allowed_ping_interval;
+
+  if (next_allowed_ping > now) {
+    /* not enough elapsed time between successive pings */
+    if (GRPC_TRACE_FLAG_ENABLED(grpc_http_trace) ||
+        GRPC_TRACE_FLAG_ENABLED(grpc_bdp_estimator_trace)) {
+      gpr_log(GPR_INFO,
+              "%s: Ping delayed [%p]: not enough time elapsed since last ping. "
+              " Last ping %f: Next ping %f: Now %f",
+              t->is_client ? "CLIENT" : "SERVER", t->peer_string,
+              static_cast<double>(t->ping_state.last_ping_sent_time),
+              static_cast<double>(next_allowed_ping), static_cast<double>(now));
+    }
+    if (!t->ping_state.is_delayed_ping_timer_set) {
+      t->ping_state.is_delayed_ping_timer_set = true;
+      GRPC_CHTTP2_REF_TRANSPORT(t, "retry_initiate_ping_locked");
+      grpc_timer_init(&t->ping_state.delayed_ping_timer, next_allowed_ping,
+                      &t->retry_initiate_ping_locked);
+    }
+    return;
+  }
+
+  pq->inflight_id = t->ping_ctr;
+  t->ping_ctr++;
+  GRPC_CLOSURE_LIST_SCHED(&pq->lists[GRPC_CHTTP2_PCL_INITIATE]);
+  grpc_closure_list_move(&pq->lists[GRPC_CHTTP2_PCL_NEXT],
+                         &pq->lists[GRPC_CHTTP2_PCL_INFLIGHT]);
+  grpc_slice_buffer_add(&t->outbuf,
+                        grpc_chttp2_ping_create(false, pq->inflight_id));
+  GRPC_STATS_INC_HTTP2_PINGS_SENT();
+  t->ping_state.last_ping_sent_time = now;
+  if (GRPC_TRACE_FLAG_ENABLED(grpc_http_trace) ||
+      GRPC_TRACE_FLAG_ENABLED(grpc_bdp_estimator_trace)) {
+    gpr_log(GPR_INFO, "%s: Ping sent [%s]: %d/%d",
+            t->is_client ? "CLIENT" : "SERVER", t->peer_string,
+            t->ping_state.pings_before_data_required,
+            t->ping_policy.max_pings_without_data);
+  }
+  t->ping_state.pings_before_data_required -=
+      (t->ping_state.pings_before_data_required != 0);
+}
+
+static bool update_list(grpc_chttp2_transport* t, grpc_chttp2_stream* s,
+                        int64_t send_bytes, grpc_chttp2_write_cb** list,
+                        int64_t* ctr, grpc_error* error) {
+  bool sched_any = false;
+  grpc_chttp2_write_cb* cb = *list;
+  *list = nullptr;
+  *ctr += send_bytes;
+  while (cb) {
+    grpc_chttp2_write_cb* next = cb->next;
+    if (cb->call_at_byte <= *ctr) {
+      sched_any = true;
+      finish_write_cb(t, s, cb, GRPC_ERROR_REF(error));
+    } else {
+      add_to_write_list(list, cb);
+    }
+    cb = next;
+  }
+  GRPC_ERROR_UNREF(error);
+  return sched_any;
+}
+
+static void report_stall(grpc_chttp2_transport* t, grpc_chttp2_stream* s,
+                         const char* staller) {
+  if (GRPC_TRACE_FLAG_ENABLED(grpc_flowctl_trace)) {
+    gpr_log(
+        GPR_DEBUG,
+        "%s:%p stream %d moved to stalled list by %s. This is FULLY expected "
+        "to happen in a healthy program that is not seeing flow control stalls."
+        " However, if you know that there are unwanted stalls, here is some "
+        "helpful data: [fc:pending=%" PRIdPTR ":pending-compressed=%" PRIdPTR
+        ":flowed=%" PRId64 ":peer_initwin=%d:t_win=%" PRId64
+        ":s_win=%d:s_delta=%" PRId64 "]",
+        t->peer_string, t, s->id, staller, s->flow_controlled_buffer.length,
+        s->stream_compression_method ==
+                GRPC_STREAM_COMPRESSION_IDENTITY_COMPRESS
+            ? 0
+            : s->compressed_data_buffer.length,
+        s->flow_controlled_bytes_flowed,
+        t->settings[GRPC_ACKED_SETTINGS]
+                   [GRPC_CHTTP2_SETTINGS_INITIAL_WINDOW_SIZE],
+        t->flow_control->remote_window(),
+        static_cast<uint32_t> GPR_MAX(
+            0,
+            s->flow_control->remote_window_delta() +
+                (int64_t)t->settings[GRPC_PEER_SETTINGS]
+                                    [GRPC_CHTTP2_SETTINGS_INITIAL_WINDOW_SIZE]),
+        s->flow_control->remote_window_delta());
+  }
+}
+
+/* How many bytes would we like to put on the wire during a single syscall */
+static uint32_t target_write_size(grpc_chttp2_transport* t) {
+  return 1024 * 1024;
+}
+
+// Returns true if initial_metadata contains only default headers.
+static bool is_default_initial_metadata(grpc_metadata_batch* initial_metadata) {
+  return initial_metadata->list.default_count == initial_metadata->list.count;
+}
+
+namespace {
+class StreamWriteContext;
+
+class WriteContext {
+ public:
+  WriteContext(grpc_chttp2_transport* t) : t_(t) {
+    GRPC_STATS_INC_HTTP2_WRITES_BEGUN();
+    GPR_TIMER_SCOPE("grpc_chttp2_begin_write", 0);
+  }
+
+  // TODO(ctiller): make this the destructor
+  void FlushStats() {
+    GRPC_STATS_INC_HTTP2_SEND_INITIAL_METADATA_PER_WRITE(
+        initial_metadata_writes_);
+    GRPC_STATS_INC_HTTP2_SEND_MESSAGE_PER_WRITE(message_writes_);
+    GRPC_STATS_INC_HTTP2_SEND_TRAILING_METADATA_PER_WRITE(
+        trailing_metadata_writes_);
+    GRPC_STATS_INC_HTTP2_SEND_FLOWCTL_PER_WRITE(flow_control_writes_);
+  }
+
+  void FlushSettings() {
+    if (t_->dirtied_local_settings && !t_->sent_local_settings) {
+      grpc_slice_buffer_add(
+          &t_->outbuf, grpc_chttp2_settings_create(
+                           t_->settings[GRPC_SENT_SETTINGS],
+                           t_->settings[GRPC_LOCAL_SETTINGS],
+                           t_->force_send_settings, GRPC_CHTTP2_NUM_SETTINGS));
+      t_->force_send_settings = false;
+      t_->dirtied_local_settings = false;
+      t_->sent_local_settings = true;
+      GRPC_STATS_INC_HTTP2_SETTINGS_WRITES();
+    }
+  }
+
+  void FlushQueuedBuffers() {
+    /* simple writes are queued to qbuf, and flushed here */
+    grpc_slice_buffer_move_into(&t_->qbuf, &t_->outbuf);
+    t_->num_pending_induced_frames = 0;
+    GPR_ASSERT(t_->qbuf.count == 0);
+  }
+
+  void FlushWindowUpdates() {
+    uint32_t transport_announce =
+        t_->flow_control->MaybeSendUpdate(t_->outbuf.count > 0);
+    if (transport_announce) {
+      grpc_transport_one_way_stats throwaway_stats;
+      grpc_slice_buffer_add(
+          &t_->outbuf, grpc_chttp2_window_update_create(0, transport_announce,
+                                                        &throwaway_stats));
+      ResetPingClock();
+    }
+  }
+
+  void FlushPingAcks() {
+    for (size_t i = 0; i < t_->ping_ack_count; i++) {
+      grpc_slice_buffer_add(&t_->outbuf,
+                            grpc_chttp2_ping_create(true, t_->ping_acks[i]));
+    }
+    t_->ping_ack_count = 0;
+  }
+
+  void EnactHpackSettings() {
+    grpc_chttp2_hpack_compressor_set_max_table_size(
+        &t_->hpack_compressor,
+        t_->settings[GRPC_PEER_SETTINGS]
+                    [GRPC_CHTTP2_SETTINGS_HEADER_TABLE_SIZE]);
+  }
+
+  void UpdateStreamsNoLongerStalled() {
+    grpc_chttp2_stream* s;
+    while (grpc_chttp2_list_pop_stalled_by_transport(t_, &s)) {
+      if (t_->closed_with_error == GRPC_ERROR_NONE &&
+          grpc_chttp2_list_add_writable_stream(t_, s)) {
+        if (!s->refcount->refs.RefIfNonZero()) {
+          grpc_chttp2_list_remove_writable_stream(t_, s);
+        }
+      }
+    }
+  }
+
+  grpc_chttp2_stream* NextStream() {
+    if (t_->outbuf.length > target_write_size(t_)) {
+      result_.partial = true;
+      return nullptr;
+    }
+
+    grpc_chttp2_stream* s;
+    if (!grpc_chttp2_list_pop_writable_stream(t_, &s)) {
+      return nullptr;
+    }
+
+    return s;
+  }
+
+  void ResetPingClock() {
+    if (!t_->is_client) {
+      t_->ping_recv_state.last_ping_recv_time = GRPC_MILLIS_INF_PAST;
+      t_->ping_recv_state.ping_strikes = 0;
+    }
+    t_->ping_state.pings_before_data_required =
+        t_->ping_policy.max_pings_without_data;
+  }
+
+  void IncInitialMetadataWrites() { ++initial_metadata_writes_; }
+  void IncWindowUpdateWrites() { ++flow_control_writes_; }
+  void IncMessageWrites() { ++message_writes_; }
+  void IncTrailingMetadataWrites() { ++trailing_metadata_writes_; }
+
+  void NoteScheduledResults() { result_.early_results_scheduled = true; }
+
+  grpc_chttp2_transport* transport() const { return t_; }
+
+  grpc_chttp2_begin_write_result Result() {
+    result_.writing = t_->outbuf.count > 0;
+    return result_;
+  }
+
+ private:
+  grpc_chttp2_transport* const t_;
+
+  /* stats histogram counters: we increment these throughout this function,
+     and at the end publish to the central stats histograms */
+  int flow_control_writes_ = 0;
+  int initial_metadata_writes_ = 0;
+  int trailing_metadata_writes_ = 0;
+  int message_writes_ = 0;
+  grpc_chttp2_begin_write_result result_ = {false, false, false};
+};
+
+class DataSendContext {
+ public:
+  DataSendContext(WriteContext* write_context, grpc_chttp2_transport* t,
+                  grpc_chttp2_stream* s)
+      : write_context_(write_context),
+        t_(t),
+        s_(s),
+        sending_bytes_before_(s_->sending_bytes) {}
+
+  uint32_t stream_remote_window() const {
+    return static_cast<uint32_t> GPR_MAX(
+        0, s_->flow_control->remote_window_delta() +
+               (int64_t)t_->settings[GRPC_PEER_SETTINGS]
+                                    [GRPC_CHTTP2_SETTINGS_INITIAL_WINDOW_SIZE]);
+  }
+
+  uint32_t max_outgoing() const {
+    return static_cast<uint32_t> GPR_MIN(
+        t_->settings[GRPC_PEER_SETTINGS][GRPC_CHTTP2_SETTINGS_MAX_FRAME_SIZE],
+        GPR_MIN(stream_remote_window(), t_->flow_control->remote_window()));
+  }
+
+  bool AnyOutgoing() const { return max_outgoing() > 0; }
+
+  void FlushUncompressedBytes() {
+    uint32_t send_bytes = static_cast<uint32_t> GPR_MIN(
+        max_outgoing(), s_->flow_controlled_buffer.length);
+    is_last_frame_ = send_bytes == s_->flow_controlled_buffer.length &&
+                     s_->fetching_send_message == nullptr &&
+                     s_->send_trailing_metadata != nullptr &&
+                     grpc_metadata_batch_is_empty(s_->send_trailing_metadata);
+    grpc_chttp2_encode_data(s_->id, &s_->flow_controlled_buffer, send_bytes,
+                            is_last_frame_, &s_->stats.outgoing, &t_->outbuf);
+    s_->flow_control->SentData(send_bytes);
+    s_->sending_bytes += send_bytes;
+  }
+
+  void FlushCompressedBytes() {
+    GPR_DEBUG_ASSERT(s_->stream_compression_method !=
+                     GRPC_STREAM_COMPRESSION_IDENTITY_COMPRESS);
+
+    uint32_t send_bytes = static_cast<uint32_t> GPR_MIN(
+        max_outgoing(), s_->compressed_data_buffer.length);
+    bool is_last_data_frame =
+        (send_bytes == s_->compressed_data_buffer.length &&
+         s_->flow_controlled_buffer.length == 0 &&
+         s_->fetching_send_message == nullptr);
+    if (is_last_data_frame && s_->send_trailing_metadata != nullptr &&
+        s_->stream_compression_ctx != nullptr) {
+      if (GPR_UNLIKELY(!grpc_stream_compress(
+              s_->stream_compression_ctx, &s_->flow_controlled_buffer,
+              &s_->compressed_data_buffer, nullptr, MAX_SIZE_T,
+              GRPC_STREAM_COMPRESSION_FLUSH_FINISH))) {
+        gpr_log(GPR_ERROR, "Stream compression failed.");
+      }
+      grpc_stream_compression_context_destroy(s_->stream_compression_ctx);
+      s_->stream_compression_ctx = nullptr;
+      /* After finish, bytes in s->compressed_data_buffer may be
+       * more than max_outgoing. Start another round of the current
+       * while loop so that send_bytes and is_last_data_frame are
+       * recalculated. */
+      return;
+    }
+    is_last_frame_ = is_last_data_frame &&
+                     s_->send_trailing_metadata != nullptr &&
+                     grpc_metadata_batch_is_empty(s_->send_trailing_metadata);
+    grpc_chttp2_encode_data(s_->id, &s_->compressed_data_buffer, send_bytes,
+                            is_last_frame_, &s_->stats.outgoing, &t_->outbuf);
+    s_->flow_control->SentData(send_bytes);
+    if (s_->compressed_data_buffer.length == 0) {
+      s_->sending_bytes += s_->uncompressed_data_size;
+    }
+  }
+
+  void CompressMoreBytes() {
+    GPR_DEBUG_ASSERT(s_->stream_compression_method !=
+                     GRPC_STREAM_COMPRESSION_IDENTITY_COMPRESS);
+
+    if (s_->stream_compression_ctx == nullptr) {
+      s_->stream_compression_ctx =
+          grpc_stream_compression_context_create(s_->stream_compression_method);
+    }
+    s_->uncompressed_data_size = s_->flow_controlled_buffer.length;
+    if (GPR_UNLIKELY(!grpc_stream_compress(
+            s_->stream_compression_ctx, &s_->flow_controlled_buffer,
+            &s_->compressed_data_buffer, nullptr, MAX_SIZE_T,
+            GRPC_STREAM_COMPRESSION_FLUSH_SYNC))) {
+      gpr_log(GPR_ERROR, "Stream compression failed.");
+    }
+  }
+
+  bool is_last_frame() const { return is_last_frame_; }
+
+  void CallCallbacks() {
+    if (update_list(
+            t_, s_,
+            static_cast<int64_t>(s_->sending_bytes - sending_bytes_before_),
+            &s_->on_flow_controlled_cbs, &s_->flow_controlled_bytes_flowed,
+            GRPC_ERROR_NONE)) {
+      write_context_->NoteScheduledResults();
+    }
+  }
+
+ private:
+  WriteContext* write_context_;
+  grpc_chttp2_transport* t_;
+  grpc_chttp2_stream* s_;
+  const size_t sending_bytes_before_;
+  bool is_last_frame_ = false;
+};
+
+class StreamWriteContext {
+ public:
+  StreamWriteContext(WriteContext* write_context, grpc_chttp2_stream* s)
+      : write_context_(write_context), t_(write_context->transport()), s_(s) {
+    GRPC_CHTTP2_IF_TRACING(
+        gpr_log(GPR_INFO, "W:%p %s[%d] im-(sent,send)=(%d,%d) announce=%d", t_,
+                t_->is_client ? "CLIENT" : "SERVER", s->id,
+                s->sent_initial_metadata, s->send_initial_metadata != nullptr,
+                (int)(s->flow_control->local_window_delta() -
+                      s->flow_control->announced_window_delta())));
+  }
+
+  void FlushInitialMetadata() {
+    /* send initial metadata if it's available */
+    if (s_->sent_initial_metadata) return;
+    if (s_->send_initial_metadata == nullptr) return;
+
+    // We skip this on the server side if there is no custom initial
+    // metadata, there are no messages to send, and we are also sending
+    // trailing metadata.  This results in a Trailers-Only response,
+    // which is required for retries, as per:
+    // https://github.com/grpc/proposal/blob/master/A6-client-retries.md#when-retries-are-valid
+    if (!t_->is_client && s_->fetching_send_message == nullptr &&
+        s_->flow_controlled_buffer.length == 0 &&
+        compressed_data_buffer_len() == 0 &&
+        s_->send_trailing_metadata != nullptr &&
+        is_default_initial_metadata(s_->send_initial_metadata)) {
+      ConvertInitialMetadataToTrailingMetadata();
+    } else {
+      grpc_encode_header_options hopt = {
+          s_->id,  // stream_id
+          false,   // is_eof
+          t_->settings[GRPC_PEER_SETTINGS]
+                      [GRPC_CHTTP2_SETTINGS_GRPC_ALLOW_TRUE_BINARY_METADATA] !=
+              0,  // use_true_binary_metadata
+          t_->settings[GRPC_PEER_SETTINGS]
+                      [GRPC_CHTTP2_SETTINGS_MAX_FRAME_SIZE],  // max_frame_size
+          &s_->stats.outgoing                                 // stats
+      };
+      grpc_chttp2_encode_header(&t_->hpack_compressor, nullptr, 0,
+                                s_->send_initial_metadata, &hopt, &t_->outbuf);
+      write_context_->ResetPingClock();
+      write_context_->IncInitialMetadataWrites();
+    }
+
+    s_->send_initial_metadata = nullptr;
+    s_->sent_initial_metadata = true;
+    write_context_->NoteScheduledResults();
+    grpc_chttp2_complete_closure_step(
+        t_, s_, &s_->send_initial_metadata_finished, GRPC_ERROR_NONE,
+        "send_initial_metadata_finished");
+  }
+
+  bool compressed_data_buffer_len() {
+    return s_->stream_compression_method ==
+                   GRPC_STREAM_COMPRESSION_IDENTITY_COMPRESS
+               ? 0
+               : s_->compressed_data_buffer.length;
+  }
+
+  void FlushWindowUpdates() {
+    /* send any window updates */
+    const uint32_t stream_announce = s_->flow_control->MaybeSendUpdate();
+    if (stream_announce == 0) return;
+
+    grpc_slice_buffer_add(
+        &t_->outbuf, grpc_chttp2_window_update_create(s_->id, stream_announce,
+                                                      &s_->stats.outgoing));
+    write_context_->ResetPingClock();
+    write_context_->IncWindowUpdateWrites();
+  }
+
+  void FlushData() {
+    if (!s_->sent_initial_metadata) return;
+
+    if (s_->flow_controlled_buffer.length == 0 &&
+        compressed_data_buffer_len() == 0) {
+      return;  // early out: nothing to do
+    }
+
+    DataSendContext data_send_context(write_context_, t_, s_);
+
+    if (!data_send_context.AnyOutgoing()) {
+      if (t_->flow_control->remote_window() <= 0) {
+        report_stall(t_, s_, "transport");
+        grpc_chttp2_list_add_stalled_by_transport(t_, s_);
+      } else if (data_send_context.stream_remote_window() <= 0) {
+        report_stall(t_, s_, "stream");
+        grpc_chttp2_list_add_stalled_by_stream(t_, s_);
+      }
+      return;  // early out: nothing to do
+    }
+
+    if (s_->stream_compression_method ==
+        GRPC_STREAM_COMPRESSION_IDENTITY_COMPRESS) {
+      while (s_->flow_controlled_buffer.length > 0 &&
+             data_send_context.max_outgoing() > 0) {
+        data_send_context.FlushUncompressedBytes();
+      }
+    } else {
+      while ((s_->flow_controlled_buffer.length > 0 ||
+              s_->compressed_data_buffer.length > 0) &&
+             data_send_context.max_outgoing() > 0) {
+        if (s_->compressed_data_buffer.length > 0) {
+          data_send_context.FlushCompressedBytes();
+        } else {
+          data_send_context.CompressMoreBytes();
+        }
+      }
+    }
+    write_context_->ResetPingClock();
+    if (data_send_context.is_last_frame()) {
+      SentLastFrame();
+    }
+    data_send_context.CallCallbacks();
+    stream_became_writable_ = true;
+    if (s_->flow_controlled_buffer.length > 0 ||
+        compressed_data_buffer_len() > 0) {
+      GRPC_CHTTP2_STREAM_REF(s_, "chttp2_writing:fork");
+      grpc_chttp2_list_add_writable_stream(t_, s_);
+    }
+    write_context_->IncMessageWrites();
+  }
+
+  void FlushTrailingMetadata() {
+    if (!s_->sent_initial_metadata) return;
+
+    if (s_->send_trailing_metadata == nullptr) return;
+    if (s_->fetching_send_message != nullptr) return;
+    if (s_->flow_controlled_buffer.length != 0) return;
+    if (compressed_data_buffer_len() != 0) return;
+
+    GRPC_CHTTP2_IF_TRACING(gpr_log(GPR_INFO, "sending trailing_metadata"));
+    if (grpc_metadata_batch_is_empty(s_->send_trailing_metadata)) {
+      grpc_chttp2_encode_data(s_->id, &s_->flow_controlled_buffer, 0, true,
+                              &s_->stats.outgoing, &t_->outbuf);
+    } else {
+      grpc_encode_header_options hopt = {
+          s_->id, true,
+          t_->settings[GRPC_PEER_SETTINGS]
+                      [GRPC_CHTTP2_SETTINGS_GRPC_ALLOW_TRUE_BINARY_METADATA] !=
+              0,
+
+          t_->settings[GRPC_PEER_SETTINGS][GRPC_CHTTP2_SETTINGS_MAX_FRAME_SIZE],
+          &s_->stats.outgoing};
+      grpc_chttp2_encode_header(&t_->hpack_compressor,
+                                extra_headers_for_trailing_metadata_,
+                                num_extra_headers_for_trailing_metadata_,
+                                s_->send_trailing_metadata, &hopt, &t_->outbuf);
+    }
+    write_context_->IncTrailingMetadataWrites();
+    write_context_->ResetPingClock();
+    SentLastFrame();
+
+    write_context_->NoteScheduledResults();
+    grpc_chttp2_complete_closure_step(
+        t_, s_, &s_->send_trailing_metadata_finished, GRPC_ERROR_NONE,
+        "send_trailing_metadata_finished");
+  }
+
+  bool stream_became_writable() { return stream_became_writable_; }
+
+ private:
+  void ConvertInitialMetadataToTrailingMetadata() {
+    GRPC_CHTTP2_IF_TRACING(
+        gpr_log(GPR_INFO, "not sending initial_metadata (Trailers-Only)"));
+    // When sending Trailers-Only, we need to move the :status and
+    // content-type headers to the trailers.
+    if (s_->send_initial_metadata->idx.named.status != nullptr) {
+      extra_headers_for_trailing_metadata_
+          [num_extra_headers_for_trailing_metadata_++] =
+              &s_->send_initial_metadata->idx.named.status->md;
+    }
+    if (s_->send_initial_metadata->idx.named.content_type != nullptr) {
+      extra_headers_for_trailing_metadata_
+          [num_extra_headers_for_trailing_metadata_++] =
+              &s_->send_initial_metadata->idx.named.content_type->md;
+    }
+  }
+
+  void SentLastFrame() {
+    s_->send_trailing_metadata = nullptr;
+    s_->sent_trailing_metadata = true;
+    s_->eos_sent = true;
+
+    if (!t_->is_client && !s_->read_closed) {
+      grpc_slice_buffer_add(
+          &t_->outbuf, grpc_chttp2_rst_stream_create(
+                           s_->id, GRPC_HTTP2_NO_ERROR, &s_->stats.outgoing));
+    }
+    grpc_chttp2_mark_stream_closed(t_, s_, !t_->is_client, true,
+                                   GRPC_ERROR_NONE);
+  }
+
+  WriteContext* const write_context_;
+  grpc_chttp2_transport* const t_;
+  grpc_chttp2_stream* const s_;
+  bool stream_became_writable_ = false;
+  grpc_mdelem* extra_headers_for_trailing_metadata_[2];
+  size_t num_extra_headers_for_trailing_metadata_ = 0;
+};
+}  // namespace
+
+grpc_chttp2_begin_write_result grpc_chttp2_begin_write(
+    grpc_chttp2_transport* t) {
+  WriteContext ctx(t);
+  ctx.FlushSettings();
+  ctx.FlushPingAcks();
+  ctx.FlushQueuedBuffers();
+  ctx.EnactHpackSettings();
+
+  if (t->flow_control->remote_window() > 0) {
+    ctx.UpdateStreamsNoLongerStalled();
+  }
+
+  /* for each grpc_chttp2_stream that's become writable, frame it's data
+     (according to available window sizes) and add to the output buffer */
+  while (grpc_chttp2_stream* s = ctx.NextStream()) {
+    StreamWriteContext stream_ctx(&ctx, s);
+    size_t orig_len = t->outbuf.length;
+    stream_ctx.FlushInitialMetadata();
+    stream_ctx.FlushWindowUpdates();
+    stream_ctx.FlushData();
+    stream_ctx.FlushTrailingMetadata();
+    if (t->outbuf.length > orig_len) {
+      /* Add this stream to the list of the contexts to be traced at TCP */
+      s->byte_counter += t->outbuf.length - orig_len;
+      if (s->traced && grpc_endpoint_can_track_err(t->ep)) {
+        grpc_core::ContextList::Append(&t->cl, s);
+      }
+    }
+    if (stream_ctx.stream_became_writable()) {
+      if (!grpc_chttp2_list_add_writing_stream(t, s)) {
+        /* already in writing list: drop ref */
+        GRPC_CHTTP2_STREAM_UNREF(s, "chttp2_writing:already_writing");
+      } else {
+        /* ref will be dropped at end of write */
+      }
+    } else {
+      GRPC_CHTTP2_STREAM_UNREF(s, "chttp2_writing:no_write");
+    }
+  }
+
+  ctx.FlushWindowUpdates();
+
+  maybe_initiate_ping(t);
+
+  return ctx.Result();
+}
+
+void grpc_chttp2_end_write(grpc_chttp2_transport* t, grpc_error* error) {
+  GPR_TIMER_SCOPE("grpc_chttp2_end_write", 0);
+  grpc_chttp2_stream* s;
+
+  if (t->channelz_socket != nullptr) {
+    t->channelz_socket->RecordMessagesSent(t->num_messages_in_next_write);
+  }
+  t->num_messages_in_next_write = 0;
+
+  while (grpc_chttp2_list_pop_writing_stream(t, &s)) {
+    if (s->sending_bytes != 0) {
+      update_list(t, s, static_cast<int64_t>(s->sending_bytes),
+                  &s->on_write_finished_cbs, &s->flow_controlled_bytes_written,
+                  GRPC_ERROR_REF(error));
+      s->sending_bytes = 0;
+    }
+    GRPC_CHTTP2_STREAM_UNREF(s, "chttp2_writing:end");
+  }
+  grpc_slice_buffer_reset_and_unref_internal(&t->outbuf);
+  GRPC_ERROR_UNREF(error);
+}