3 * Copyright 2017 gRPC authors.
5 * Licensed under the Apache License, Version 2.0 (the "License");
6 * you may not use this file except in compliance with the License.
7 * You may obtain a copy of the License at
9 * http://www.apache.org/licenses/LICENSE-2.0
11 * Unless required by applicable law or agreed to in writing, software
12 * distributed under the License is distributed on an "AS IS" BASIS,
13 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14 * See the License for the specific language governing permissions and
15 * limitations under the License.
19 #include <grpc/support/port_platform.h>
21 #include "src/core/ext/transport/chttp2/transport/flow_control.h"
28 #include <grpc/support/alloc.h>
29 #include <grpc/support/log.h>
30 #include <grpc/support/string_util.h>
32 #include "src/core/ext/transport/chttp2/transport/internal.h"
33 #include "src/core/lib/gpr/string.h"
35 grpc_core::TraceFlag grpc_flowctl_trace(false, "flowctl");
42 static constexpr const int kTracePadding = 30;
43 static constexpr const uint32_t kMaxWindowUpdateSize = (1u << 31) - 1;
45 static char* fmt_int64_diff_str(int64_t old_val, int64_t new_val) {
47 if (old_val != new_val) {
48 gpr_asprintf(&str, "%" PRId64 " -> %" PRId64 "", old_val, new_val);
50 gpr_asprintf(&str, "%" PRId64 "", old_val);
52 char* str_lp = gpr_leftpad(str, ' ', kTracePadding);
57 static char* fmt_uint32_diff_str(uint32_t old_val, uint32_t new_val) {
59 if (old_val != new_val) {
60 gpr_asprintf(&str, "%" PRIu32 " -> %" PRIu32 "", old_val, new_val);
62 gpr_asprintf(&str, "%" PRIu32 "", old_val);
64 char* str_lp = gpr_leftpad(str, ' ', kTracePadding);
70 void FlowControlTrace::Init(const char* reason, TransportFlowControl* tfc,
71 StreamFlowControl* sfc) {
75 remote_window_ = tfc->remote_window();
76 target_window_ = tfc->target_window();
77 announced_window_ = tfc->announced_window();
79 remote_window_delta_ = sfc->remote_window_delta();
80 local_window_delta_ = sfc->local_window_delta();
81 announced_window_delta_ = sfc->announced_window_delta();
85 void FlowControlTrace::Finish() {
86 uint32_t acked_local_window =
87 tfc_->transport()->settings[GRPC_SENT_SETTINGS]
88 [GRPC_CHTTP2_SETTINGS_INITIAL_WINDOW_SIZE];
89 uint32_t remote_window =
90 tfc_->transport()->settings[GRPC_PEER_SETTINGS]
91 [GRPC_CHTTP2_SETTINGS_INITIAL_WINDOW_SIZE];
92 char* trw_str = fmt_int64_diff_str(remote_window_, tfc_->remote_window());
93 char* tlw_str = fmt_int64_diff_str(target_window_, tfc_->target_window());
95 fmt_int64_diff_str(announced_window_, tfc_->announced_window());
99 if (sfc_ != nullptr) {
100 srw_str = fmt_int64_diff_str(remote_window_delta_ + remote_window,
101 sfc_->remote_window_delta() + remote_window);
103 fmt_int64_diff_str(local_window_delta_ + acked_local_window,
104 sfc_->local_window_delta() + acked_local_window);
106 fmt_int64_diff_str(announced_window_delta_ + acked_local_window,
107 sfc_->announced_window_delta() + acked_local_window);
109 srw_str = gpr_leftpad("", ' ', kTracePadding);
110 slw_str = gpr_leftpad("", ' ', kTracePadding);
111 saw_str = gpr_leftpad("", ' ', kTracePadding);
114 "%p[%u][%s] | %s | trw:%s, tlw:%s, taw:%s, srw:%s, slw:%s, saw:%s",
115 tfc_, sfc_ != nullptr ? sfc_->stream()->id : 0,
116 tfc_->transport()->is_client ? "cli" : "svr", reason_, trw_str,
117 tlw_str, taw_str, srw_str, slw_str, saw_str);
126 const char* FlowControlAction::UrgencyString(Urgency u) {
128 case Urgency::NO_ACTION_NEEDED:
130 case Urgency::UPDATE_IMMEDIATELY:
131 return "update immediately";
132 case Urgency::QUEUE_UPDATE:
133 return "queue update";
135 GPR_UNREACHABLE_CODE(return "unknown");
137 GPR_UNREACHABLE_CODE(return "unknown");
140 void FlowControlAction::Trace(grpc_chttp2_transport* t) const {
141 char* iw_str = fmt_uint32_diff_str(
142 t->settings[GRPC_SENT_SETTINGS][GRPC_CHTTP2_SETTINGS_INITIAL_WINDOW_SIZE],
143 initial_window_size_);
144 char* mf_str = fmt_uint32_diff_str(
145 t->settings[GRPC_SENT_SETTINGS][GRPC_CHTTP2_SETTINGS_MAX_FRAME_SIZE],
147 gpr_log(GPR_DEBUG, "t[%s], s[%s], iw:%s:%s mf:%s:%s",
148 UrgencyString(send_transport_update_),
149 UrgencyString(send_stream_update_),
150 UrgencyString(send_initial_window_update_), iw_str,
151 UrgencyString(send_max_frame_size_update_), mf_str);
156 TransportFlowControlDisabled::TransportFlowControlDisabled(
157 grpc_chttp2_transport* t) {
158 remote_window_ = kMaxWindow;
159 target_initial_window_size_ = kMaxWindow;
160 announced_window_ = kMaxWindow;
161 t->settings[GRPC_PEER_SETTINGS][GRPC_CHTTP2_SETTINGS_MAX_FRAME_SIZE] =
163 t->settings[GRPC_SENT_SETTINGS][GRPC_CHTTP2_SETTINGS_MAX_FRAME_SIZE] =
165 t->settings[GRPC_ACKED_SETTINGS][GRPC_CHTTP2_SETTINGS_MAX_FRAME_SIZE] =
167 t->settings[GRPC_PEER_SETTINGS][GRPC_CHTTP2_SETTINGS_INITIAL_WINDOW_SIZE] =
169 t->settings[GRPC_SENT_SETTINGS][GRPC_CHTTP2_SETTINGS_INITIAL_WINDOW_SIZE] =
171 t->settings[GRPC_ACKED_SETTINGS][GRPC_CHTTP2_SETTINGS_INITIAL_WINDOW_SIZE] =
175 TransportFlowControl::TransportFlowControl(const grpc_chttp2_transport* t,
176 bool enable_bdp_probe)
178 enable_bdp_probe_(enable_bdp_probe),
179 bdp_estimator_(t->peer_string),
180 pid_controller_(grpc_core::PidController::Args()
184 .set_initial_control_value(TargetLogBdp())
185 .set_min_control_value(-1)
186 .set_max_control_value(25)
187 .set_integral_range(10)),
188 last_pid_update_(grpc_core::ExecCtx::Get()->Now()) {}
190 uint32_t TransportFlowControl::MaybeSendUpdate(bool writing_anyway) {
191 FlowControlTrace trace("t updt sent", this, nullptr);
192 const uint32_t target_announced_window =
193 static_cast<uint32_t>(target_window());
194 if ((writing_anyway || announced_window_ <= target_announced_window / 2) &&
195 announced_window_ != target_announced_window) {
196 const uint32_t announce = static_cast<uint32_t> GPR_CLAMP(
197 target_announced_window - announced_window_, 0, kMaxWindowUpdateSize);
198 announced_window_ += announce;
204 grpc_error* TransportFlowControl::ValidateRecvData(
205 int64_t incoming_frame_size) {
206 if (incoming_frame_size > announced_window_) {
209 "frame of size %" PRId64 " overflows local window of %" PRId64,
210 incoming_frame_size, announced_window_);
211 grpc_error* err = GRPC_ERROR_CREATE_FROM_COPIED_STRING(msg);
215 return GRPC_ERROR_NONE;
218 StreamFlowControl::StreamFlowControl(TransportFlowControl* tfc,
219 const grpc_chttp2_stream* s)
220 : tfc_(tfc), s_(s) {}
222 grpc_error* StreamFlowControl::RecvData(int64_t incoming_frame_size) {
223 FlowControlTrace trace(" data recv", tfc_, this);
225 grpc_error* error = GRPC_ERROR_NONE;
226 error = tfc_->ValidateRecvData(incoming_frame_size);
227 if (error != GRPC_ERROR_NONE) return error;
229 uint32_t sent_init_window =
230 tfc_->transport()->settings[GRPC_SENT_SETTINGS]
231 [GRPC_CHTTP2_SETTINGS_INITIAL_WINDOW_SIZE];
232 uint32_t acked_init_window =
233 tfc_->transport()->settings[GRPC_ACKED_SETTINGS]
234 [GRPC_CHTTP2_SETTINGS_INITIAL_WINDOW_SIZE];
236 int64_t acked_stream_window = announced_window_delta_ + acked_init_window;
237 int64_t sent_stream_window = announced_window_delta_ + sent_init_window;
238 if (incoming_frame_size > acked_stream_window) {
239 if (incoming_frame_size <= sent_stream_window) {
241 "Incoming frame of size %" PRId64
242 " exceeds local window size of %" PRId64
244 "The (un-acked, future) window size would be %" PRId64
245 " which is not exceeded.\n"
246 "This would usually cause a disconnection, but allowing it due to"
247 "broken HTTP2 implementations in the wild.\n"
248 "See (for example) https://github.com/netty/netty/issues/6520.",
249 incoming_frame_size, acked_stream_window, sent_stream_window);
253 &msg, "frame of size %" PRId64 " overflows local window of %" PRId64,
254 incoming_frame_size, acked_stream_window);
255 grpc_error* err = GRPC_ERROR_CREATE_FROM_COPIED_STRING(msg);
261 UpdateAnnouncedWindowDelta(tfc_, -incoming_frame_size);
262 local_window_delta_ -= incoming_frame_size;
263 tfc_->CommitRecvData(incoming_frame_size);
264 return GRPC_ERROR_NONE;
267 uint32_t StreamFlowControl::MaybeSendUpdate() {
268 FlowControlTrace trace("s updt sent", tfc_, this);
269 if (local_window_delta_ > announced_window_delta_) {
270 uint32_t announce = static_cast<uint32_t> GPR_CLAMP(
271 local_window_delta_ - announced_window_delta_, 0, kMaxWindowUpdateSize);
272 UpdateAnnouncedWindowDelta(tfc_, announce);
278 void StreamFlowControl::IncomingByteStreamUpdate(size_t max_size_hint,
279 size_t have_already) {
280 FlowControlTrace trace("app st recv", tfc_, this);
281 uint32_t max_recv_bytes;
282 uint32_t sent_init_window =
283 tfc_->transport()->settings[GRPC_SENT_SETTINGS]
284 [GRPC_CHTTP2_SETTINGS_INITIAL_WINDOW_SIZE];
286 /* clamp max recv hint to an allowable size */
287 if (max_size_hint >= UINT32_MAX - sent_init_window) {
288 max_recv_bytes = UINT32_MAX - sent_init_window;
290 max_recv_bytes = static_cast<uint32_t>(max_size_hint);
293 /* account for bytes already received but unknown to higher layers */
294 if (max_recv_bytes >= have_already) {
295 max_recv_bytes -= static_cast<uint32_t>(have_already);
300 /* add some small lookahead to keep pipelines flowing */
301 GPR_ASSERT(max_recv_bytes <= UINT32_MAX - sent_init_window);
302 if (local_window_delta_ < max_recv_bytes) {
303 uint32_t add_max_recv_bytes =
304 static_cast<uint32_t>(max_recv_bytes - local_window_delta_);
305 local_window_delta_ += add_max_recv_bytes;
309 // Take in a target and modifies it based on the memory pressure of the system
310 static double AdjustForMemoryPressure(grpc_resource_quota* quota,
312 // do not increase window under heavy memory pressure.
313 double memory_pressure = grpc_resource_quota_get_memory_pressure(quota);
314 static const double kLowMemPressure = 0.1;
315 static const double kZeroTarget = 22;
316 static const double kHighMemPressure = 0.8;
317 static const double kMaxMemPressure = 0.9;
318 if (memory_pressure < kLowMemPressure && target < kZeroTarget) {
319 target = (target - kZeroTarget) * memory_pressure / kLowMemPressure +
321 } else if (memory_pressure > kHighMemPressure) {
322 target *= 1 - GPR_MIN(1, (memory_pressure - kHighMemPressure) /
323 (kMaxMemPressure - kHighMemPressure));
328 double TransportFlowControl::TargetLogBdp() {
329 return AdjustForMemoryPressure(
330 grpc_resource_user_quota(grpc_endpoint_get_resource_user(t_->ep)),
331 1 + log2(bdp_estimator_.EstimateBdp()));
334 double TransportFlowControl::SmoothLogBdp(double value) {
335 grpc_millis now = grpc_core::ExecCtx::Get()->Now();
336 double bdp_error = value - pid_controller_.last_control_value();
337 const double dt = static_cast<double>(now - last_pid_update_) * 1e-3;
338 last_pid_update_ = now;
340 const double kMaxDt = 0.1;
341 return pid_controller_.Update(bdp_error, dt > kMaxDt ? kMaxDt : dt);
344 FlowControlAction::Urgency TransportFlowControl::DeltaUrgency(
345 int64_t value, grpc_chttp2_setting_id setting_id) {
346 int64_t delta = value - static_cast<int64_t>(
347 t_->settings[GRPC_LOCAL_SETTINGS][setting_id]);
348 // TODO(ncteisen): tune this
349 if (delta != 0 && (delta <= -value / 5 || delta >= value / 5)) {
350 return FlowControlAction::Urgency::QUEUE_UPDATE;
352 return FlowControlAction::Urgency::NO_ACTION_NEEDED;
356 FlowControlAction TransportFlowControl::PeriodicUpdate() {
357 FlowControlAction action;
358 if (enable_bdp_probe_) {
359 // get bdp estimate and update initial_window accordingly.
360 // target might change based on how much memory pressure we are under
361 // TODO(ncteisen): experiment with setting target to be huge under low
363 const double target = pow(2, SmoothLogBdp(TargetLogBdp()));
365 // Though initial window 'could' drop to 0, we keep the floor at 128
366 target_initial_window_size_ =
367 static_cast<int32_t> GPR_CLAMP(target, 128, INT32_MAX);
369 action.set_send_initial_window_update(
370 DeltaUrgency(target_initial_window_size_,
371 GRPC_CHTTP2_SETTINGS_INITIAL_WINDOW_SIZE),
372 static_cast<uint32_t>(target_initial_window_size_));
374 // get bandwidth estimate and update max_frame accordingly.
375 double bw_dbl = bdp_estimator_.EstimateBandwidth();
376 // we target the max of BDP or bandwidth in microseconds.
377 int32_t frame_size = static_cast<int32_t> GPR_CLAMP(
378 GPR_MAX((int32_t)GPR_CLAMP(bw_dbl, 0, INT_MAX) / 1000,
379 target_initial_window_size_),
381 action.set_send_max_frame_size_update(
382 DeltaUrgency(static_cast<int64_t>(frame_size),
383 GRPC_CHTTP2_SETTINGS_MAX_FRAME_SIZE),
386 return UpdateAction(action);
389 FlowControlAction StreamFlowControl::UpdateAction(FlowControlAction action) {
390 // TODO(ncteisen): tune this
391 if (!s_->read_closed) {
392 uint32_t sent_init_window =
393 tfc_->transport()->settings[GRPC_SENT_SETTINGS]
394 [GRPC_CHTTP2_SETTINGS_INITIAL_WINDOW_SIZE];
395 if (local_window_delta_ > announced_window_delta_ &&
396 announced_window_delta_ + sent_init_window <= sent_init_window / 2) {
397 action.set_send_stream_update(
398 FlowControlAction::Urgency::UPDATE_IMMEDIATELY);
399 } else if (local_window_delta_ > announced_window_delta_) {
400 action.set_send_stream_update(FlowControlAction::Urgency::QUEUE_UPDATE);
407 } // namespace chttp2
408 } // namespace grpc_core