3 * Copyright 2015 gRPC authors.
5 * Licensed under the Apache License, Version 2.0 (the "License");
6 * you may not use this file except in compliance with the License.
7 * You may obtain a copy of the License at
9 * http://www.apache.org/licenses/LICENSE-2.0
11 * Unless required by applicable law or agreed to in writing, software
12 * distributed under the License is distributed on an "AS IS" BASIS,
13 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14 * See the License for the specific language governing permissions and
15 * limitations under the License.
19 #ifndef GRPC_CORE_EXT_TRANSPORT_CHTTP2_TRANSPORT_INTERNAL_H
20 #define GRPC_CORE_EXT_TRANSPORT_CHTTP2_TRANSPORT_INTERNAL_H
22 #include <grpc/support/port_platform.h>
27 #include "src/core/ext/transport/chttp2/transport/flow_control.h"
28 #include "src/core/ext/transport/chttp2/transport/frame.h"
29 #include "src/core/ext/transport/chttp2/transport/frame_data.h"
30 #include "src/core/ext/transport/chttp2/transport/frame_goaway.h"
31 #include "src/core/ext/transport/chttp2/transport/frame_ping.h"
32 #include "src/core/ext/transport/chttp2/transport/frame_rst_stream.h"
33 #include "src/core/ext/transport/chttp2/transport/frame_settings.h"
34 #include "src/core/ext/transport/chttp2/transport/frame_window_update.h"
35 #include "src/core/ext/transport/chttp2/transport/hpack_encoder.h"
36 #include "src/core/ext/transport/chttp2/transport/hpack_parser.h"
37 #include "src/core/ext/transport/chttp2/transport/incoming_metadata.h"
38 #include "src/core/ext/transport/chttp2/transport/stream_map.h"
39 #include "src/core/lib/channel/channelz.h"
40 #include "src/core/lib/compression/stream_compression.h"
41 #include "src/core/lib/gprpp/manual_constructor.h"
42 #include "src/core/lib/iomgr/combiner.h"
43 #include "src/core/lib/iomgr/endpoint.h"
44 #include "src/core/lib/iomgr/timer.h"
45 #include "src/core/lib/transport/connectivity_state.h"
46 #include "src/core/lib/transport/transport_impl.h"
52 /* streams are kept in various linked lists depending on what things need to
53 happen to them... this enum labels each list */
55 GRPC_CHTTP2_LIST_WRITABLE,
56 GRPC_CHTTP2_LIST_WRITING,
57 GRPC_CHTTP2_LIST_STALLED_BY_TRANSPORT,
58 GRPC_CHTTP2_LIST_STALLED_BY_STREAM,
59 /** streams that are waiting to start because there are too many concurrent
60 streams on the connection */
61 GRPC_CHTTP2_LIST_WAITING_FOR_CONCURRENCY,
62 STREAM_LIST_COUNT /* must be last */
63 } grpc_chttp2_stream_list_id;
66 GRPC_CHTTP2_WRITE_STATE_IDLE,
67 GRPC_CHTTP2_WRITE_STATE_WRITING,
68 GRPC_CHTTP2_WRITE_STATE_WRITING_WITH_MORE,
69 } grpc_chttp2_write_state;
72 GRPC_CHTTP2_OPTIMIZE_FOR_LATENCY,
73 GRPC_CHTTP2_OPTIMIZE_FOR_THROUGHPUT,
74 } grpc_chttp2_optimization_target;
77 GRPC_CHTTP2_PCL_INITIATE = 0,
79 GRPC_CHTTP2_PCL_INFLIGHT,
80 GRPC_CHTTP2_PCL_COUNT /* must be last */
81 } grpc_chttp2_ping_closure_list;
84 GRPC_CHTTP2_INITIATE_WRITE_INITIAL_WRITE,
85 GRPC_CHTTP2_INITIATE_WRITE_START_NEW_STREAM,
86 GRPC_CHTTP2_INITIATE_WRITE_SEND_MESSAGE,
87 GRPC_CHTTP2_INITIATE_WRITE_SEND_INITIAL_METADATA,
88 GRPC_CHTTP2_INITIATE_WRITE_SEND_TRAILING_METADATA,
89 GRPC_CHTTP2_INITIATE_WRITE_RETRY_SEND_PING,
90 GRPC_CHTTP2_INITIATE_WRITE_CONTINUE_PINGS,
91 GRPC_CHTTP2_INITIATE_WRITE_GOAWAY_SENT,
92 GRPC_CHTTP2_INITIATE_WRITE_RST_STREAM,
93 GRPC_CHTTP2_INITIATE_WRITE_CLOSE_FROM_API,
94 GRPC_CHTTP2_INITIATE_WRITE_STREAM_FLOW_CONTROL,
95 GRPC_CHTTP2_INITIATE_WRITE_TRANSPORT_FLOW_CONTROL,
96 GRPC_CHTTP2_INITIATE_WRITE_SEND_SETTINGS,
97 GRPC_CHTTP2_INITIATE_WRITE_FLOW_CONTROL_UNSTALLED_BY_SETTING,
98 GRPC_CHTTP2_INITIATE_WRITE_FLOW_CONTROL_UNSTALLED_BY_UPDATE,
99 GRPC_CHTTP2_INITIATE_WRITE_APPLICATION_PING,
100 GRPC_CHTTP2_INITIATE_WRITE_KEEPALIVE_PING,
101 GRPC_CHTTP2_INITIATE_WRITE_TRANSPORT_FLOW_CONTROL_UNSTALLED,
102 GRPC_CHTTP2_INITIATE_WRITE_PING_RESPONSE,
103 GRPC_CHTTP2_INITIATE_WRITE_FORCE_RST_STREAM,
104 } grpc_chttp2_initiate_write_reason;
106 const char* grpc_chttp2_initiate_write_reason_string(
107 grpc_chttp2_initiate_write_reason reason);
110 grpc_closure_list lists[GRPC_CHTTP2_PCL_COUNT] = {};
111 uint64_t inflight_id = 0;
112 } grpc_chttp2_ping_queue;
115 int max_pings_without_data;
116 int max_ping_strikes;
117 grpc_millis min_sent_ping_interval_without_data;
118 grpc_millis min_recv_ping_interval_without_data;
119 } grpc_chttp2_repeated_ping_policy;
122 grpc_millis last_ping_sent_time;
123 int pings_before_data_required;
124 grpc_timer delayed_ping_timer;
125 bool is_delayed_ping_timer_set;
126 } grpc_chttp2_repeated_ping_state;
129 grpc_millis last_ping_recv_time;
131 } grpc_chttp2_server_ping_recv_state;
133 /* deframer state for the overall http2 stream of bytes */
135 /* prefix: one entry per http2 connection prefix byte */
136 GRPC_DTS_CLIENT_PREFIX_0 = 0,
137 GRPC_DTS_CLIENT_PREFIX_1,
138 GRPC_DTS_CLIENT_PREFIX_2,
139 GRPC_DTS_CLIENT_PREFIX_3,
140 GRPC_DTS_CLIENT_PREFIX_4,
141 GRPC_DTS_CLIENT_PREFIX_5,
142 GRPC_DTS_CLIENT_PREFIX_6,
143 GRPC_DTS_CLIENT_PREFIX_7,
144 GRPC_DTS_CLIENT_PREFIX_8,
145 GRPC_DTS_CLIENT_PREFIX_9,
146 GRPC_DTS_CLIENT_PREFIX_10,
147 GRPC_DTS_CLIENT_PREFIX_11,
148 GRPC_DTS_CLIENT_PREFIX_12,
149 GRPC_DTS_CLIENT_PREFIX_13,
150 GRPC_DTS_CLIENT_PREFIX_14,
151 GRPC_DTS_CLIENT_PREFIX_15,
152 GRPC_DTS_CLIENT_PREFIX_16,
153 GRPC_DTS_CLIENT_PREFIX_17,
154 GRPC_DTS_CLIENT_PREFIX_18,
155 GRPC_DTS_CLIENT_PREFIX_19,
156 GRPC_DTS_CLIENT_PREFIX_20,
157 GRPC_DTS_CLIENT_PREFIX_21,
158 GRPC_DTS_CLIENT_PREFIX_22,
159 GRPC_DTS_CLIENT_PREFIX_23,
160 /* frame header byte 0... */
161 /* must follow from the prefix states */
170 /* ... frame header byte 8 */
172 /* inside a http2 frame */
174 } grpc_chttp2_deframe_transport_state;
177 grpc_chttp2_stream* head;
178 grpc_chttp2_stream* tail;
179 } grpc_chttp2_stream_list;
182 grpc_chttp2_stream* next;
183 grpc_chttp2_stream* prev;
184 } grpc_chttp2_stream_link;
186 /* We keep several sets of connection wide parameters */
188 /* The settings our peer has asked for (and we have acked) */
189 GRPC_PEER_SETTINGS = 0,
190 /* The settings we'd like to have */
192 /* The settings we've published to our peer */
194 /* The settings the peer has acked */
196 GRPC_NUM_SETTING_SETS
197 } grpc_chttp2_setting_set;
200 GRPC_CHTTP2_NO_GOAWAY_SEND,
201 GRPC_CHTTP2_GOAWAY_SEND_SCHEDULED,
202 GRPC_CHTTP2_GOAWAY_SENT,
203 } grpc_chttp2_sent_goaway_state;
205 typedef struct grpc_chttp2_write_cb {
206 int64_t call_at_byte;
207 grpc_closure* closure;
208 struct grpc_chttp2_write_cb* next;
209 } grpc_chttp2_write_cb;
211 namespace grpc_core {
213 class Chttp2IncomingByteStream : public ByteStream {
215 Chttp2IncomingByteStream(grpc_chttp2_transport* transport,
216 grpc_chttp2_stream* stream, uint32_t frame_size,
219 void Orphan() override;
221 bool Next(size_t max_size_hint, grpc_closure* on_complete) override;
222 grpc_error* Pull(grpc_slice* slice) override;
223 void Shutdown(grpc_error* error) override;
225 // TODO(roth): When I converted this class to C++, I wanted to make it
226 // inherit from RefCounted or InternallyRefCounted instead of continuing
227 // to use its own custom ref-counting code. However, that would require
228 // using multiple inheritance, which sucks in general. And to make matters
229 // worse, it causes problems with our New<> and Delete<> wrappers.
230 // Specifically, unless RefCounted is first in the list of parent classes,
231 // it will see a different value of the address of the object than the one
232 // we actually allocated, in which case gpr_free() will be called on a
233 // different address than the one we got from gpr_malloc(), thus causing a
234 // crash. Given the fragility of depending on that, as well as a desire to
235 // avoid multiple inheritance in general, I've decided to leave this
236 // alone for now. We can revisit this once we're able to link against
237 // libc++, at which point we can eliminate New<> and Delete<> and
238 // switch to std::shared_ptr<>.
239 void Ref() { refs_.Ref(); }
241 if (GPR_UNLIKELY(refs_.Unref())) {
242 grpc_core::Delete(this);
246 void PublishError(grpc_error* error);
248 grpc_error* Push(const grpc_slice& slice, grpc_slice* slice_out);
250 grpc_error* Finished(grpc_error* error, bool reset_on_error);
252 uint32_t remaining_bytes() const { return remaining_bytes_; }
255 static void NextLocked(void* arg, grpc_error* error_ignored);
256 static void OrphanLocked(void* arg, grpc_error* error_ignored);
258 void MaybeCreateStreamDecompressionCtx();
260 grpc_chttp2_transport* transport_; // Immutable.
261 grpc_chttp2_stream* stream_; // Immutable.
263 grpc_core::RefCount refs_;
265 /* Accessed only by transport thread when stream->pending_byte_stream == false
266 * Accessed only by application thread when stream->pending_byte_stream ==
268 uint32_t remaining_bytes_;
270 /* Accessed only by transport thread when stream->pending_byte_stream == false
271 * Accessed only by application thread when stream->pending_byte_stream ==
274 grpc_closure closure;
275 size_t max_size_hint;
276 grpc_closure* on_complete;
278 grpc_closure destroy_action_;
281 } // namespace grpc_core
284 GRPC_CHTTP2_KEEPALIVE_STATE_WAITING,
285 GRPC_CHTTP2_KEEPALIVE_STATE_PINGING,
286 GRPC_CHTTP2_KEEPALIVE_STATE_DYING,
287 GRPC_CHTTP2_KEEPALIVE_STATE_DISABLED,
288 } grpc_chttp2_keepalive_state;
290 struct grpc_chttp2_transport {
291 grpc_chttp2_transport(const grpc_channel_args* channel_args,
292 grpc_endpoint* ep, bool is_client,
293 grpc_resource_user* resource_user);
294 ~grpc_chttp2_transport();
296 grpc_transport base; /* must be first */
297 grpc_core::RefCount refs;
301 grpc_resource_user* resource_user;
303 grpc_combiner* combiner;
305 grpc_closure* notify_on_receive_settings = nullptr;
307 /** write execution state of the transport */
308 grpc_chttp2_write_state write_state = GRPC_CHTTP2_WRITE_STATE_IDLE;
309 /** is this the first write in a series of writes?
310 set when we initiate writing from idle, cleared when we
311 initiate writing from writing+more */
312 bool is_first_write_in_batch = false;
314 /** is the transport destroying itself? */
315 uint8_t destroying = false;
316 /** has the upper layer closed the transport? */
317 grpc_error* closed_with_error = GRPC_ERROR_NONE;
319 /** is there a read request to the endpoint outstanding? */
320 uint8_t endpoint_reading = 1;
322 grpc_chttp2_optimization_target opt_target = GRPC_CHTTP2_OPTIMIZE_FOR_LATENCY;
324 /** various lists of streams */
325 grpc_chttp2_stream_list lists[STREAM_LIST_COUNT] = {};
327 /** maps stream id to grpc_chttp2_stream objects */
328 grpc_chttp2_stream_map stream_map;
330 grpc_closure write_action_begin_locked;
331 grpc_closure write_action;
332 grpc_closure write_action_end_locked;
334 grpc_closure read_action_locked;
336 /** incoming read bytes */
337 grpc_slice_buffer read_buffer;
339 /** address to place a newly accepted stream - set and unset by
340 grpc_chttp2_parsing_accept_stream; used by init_stream to
341 publish the accepted server stream */
342 grpc_chttp2_stream** accepting_stream = nullptr;
345 /* accept stream callback */
346 void (*accept_stream)(void* user_data, grpc_transport* transport,
347 const void* server_data);
348 void* accept_stream_user_data;
350 /** connectivity tracking */
351 grpc_connectivity_state_tracker state_tracker;
354 /** data to write now */
355 grpc_slice_buffer outbuf;
356 /** hpack encoding */
357 grpc_chttp2_hpack_compressor hpack_compressor;
358 /** is this a client? */
361 /** data to write next write */
362 grpc_slice_buffer qbuf;
364 /** how much data are we willing to buffer when the WRITE_BUFFER_HINT is set?
366 uint32_t write_buffer_size = grpc_core::chttp2::kDefaultWindow;
368 /** Set to a grpc_error object if a goaway frame is received. By default, set
369 * to GRPC_ERROR_NONE */
370 grpc_error* goaway_error = GRPC_ERROR_NONE;
372 grpc_chttp2_sent_goaway_state sent_goaway_state = GRPC_CHTTP2_NO_GOAWAY_SEND;
374 /** are the local settings dirty and need to be sent? */
375 bool dirtied_local_settings = true;
376 /** have local settings been sent? */
377 bool sent_local_settings = false;
378 /** bitmask of setting indexes to send out
379 Hack: it's common for implementations to assume 65536 bytes initial send
380 window -- this should by rights be 0 */
381 uint32_t force_send_settings = 1 << GRPC_CHTTP2_SETTINGS_INITIAL_WINDOW_SIZE;
382 /** settings values */
383 uint32_t settings[GRPC_NUM_SETTING_SETS][GRPC_CHTTP2_NUM_SETTINGS];
385 /** what is the next stream id to be allocated by this peer?
386 copied to next_stream_id in parsing when parsing commences */
387 uint32_t next_stream_id = 0;
389 /** last new stream id */
390 uint32_t last_new_stream_id = 0;
392 /** ping queues for various ping insertion points */
393 grpc_chttp2_ping_queue ping_queue = grpc_chttp2_ping_queue();
394 grpc_chttp2_repeated_ping_policy ping_policy;
395 grpc_chttp2_repeated_ping_state ping_state;
396 uint64_t ping_ctr = 0; /* unique id for pings */
397 grpc_closure retry_initiate_ping_locked;
400 size_t ping_ack_count = 0;
401 size_t ping_ack_capacity = 0;
402 uint64_t* ping_acks = nullptr;
403 grpc_chttp2_server_ping_recv_state ping_recv_state;
405 /** parser for headers */
406 grpc_chttp2_hpack_parser hpack_parser;
407 /** simple one shot parsers */
409 grpc_chttp2_window_update_parser window_update;
410 grpc_chttp2_settings_parser settings;
411 grpc_chttp2_ping_parser ping;
412 grpc_chttp2_rst_stream_parser rst_stream;
414 /** parser for goaway frames */
415 grpc_chttp2_goaway_parser goaway_parser;
417 grpc_core::PolymorphicManualConstructor<
418 grpc_core::chttp2::TransportFlowControlBase,
419 grpc_core::chttp2::TransportFlowControl,
420 grpc_core::chttp2::TransportFlowControlDisabled>
422 /** initial window change. This is tracked as we parse settings frames from
423 * the remote peer. If there is a positive delta, then we will make all
424 * streams readable since they may have become unstalled */
425 int64_t initial_window_update = 0;
428 grpc_chttp2_deframe_transport_state deframe_state = GRPC_DTS_CLIENT_PREFIX_0;
429 uint8_t incoming_frame_type = 0;
430 uint8_t incoming_frame_flags = 0;
431 uint8_t header_eof = 0;
432 bool is_first_frame = true;
433 uint32_t expect_continuation_stream_id = 0;
434 uint32_t incoming_frame_size = 0;
435 uint32_t incoming_stream_id = 0;
438 void* parser_data = nullptr;
439 grpc_chttp2_stream* incoming_stream = nullptr;
440 grpc_error* (*parser)(void* parser_user_data, grpc_chttp2_transport* t,
441 grpc_chttp2_stream* s, const grpc_slice& slice,
444 grpc_chttp2_write_cb* write_cb_pool = nullptr;
447 grpc_closure next_bdp_ping_timer_expired_locked;
448 grpc_closure start_bdp_ping_locked;
449 grpc_closure finish_bdp_ping_locked;
451 /* if non-NULL, close the transport with this error when writes are finished
453 grpc_error* close_transport_on_writes_finished = GRPC_ERROR_NONE;
455 /* a list of closures to run after writes are finished */
456 grpc_closure_list run_after_write = GRPC_CLOSURE_LIST_INIT;
458 /* buffer pool state */
459 /** have we scheduled a benign cleanup? */
460 bool benign_reclaimer_registered = false;
461 /** have we scheduled a destructive cleanup? */
462 bool destructive_reclaimer_registered = false;
463 /** benign cleanup closure */
464 grpc_closure benign_reclaimer_locked;
465 /** destructive cleanup closure */
466 grpc_closure destructive_reclaimer_locked;
468 /* next bdp ping timer */
469 bool have_next_bdp_ping_timer = false;
470 grpc_timer next_bdp_ping_timer;
472 /* keep-alive ping support */
473 /** Closure to initialize a keepalive ping */
474 grpc_closure init_keepalive_ping_locked;
475 /** Closure to run when the keepalive ping is sent */
476 grpc_closure start_keepalive_ping_locked;
477 /** Cousure to run when the keepalive ping ack is received */
478 grpc_closure finish_keepalive_ping_locked;
479 /** Closrue to run when the keepalive ping timeouts */
480 grpc_closure keepalive_watchdog_fired_locked;
481 /** timer to initiate ping events */
482 grpc_timer keepalive_ping_timer;
483 /** watchdog to kill the transport when waiting for the keepalive ping */
484 grpc_timer keepalive_watchdog_timer;
485 /** time duration in between pings */
486 grpc_millis keepalive_time;
487 /** grace period for a ping to complete before watchdog kicks in */
488 grpc_millis keepalive_timeout;
489 /** if keepalive pings are allowed when there's no outstanding streams */
490 bool keepalive_permit_without_calls = false;
491 /** keep-alive state machine state */
492 grpc_chttp2_keepalive_state keepalive_state;
493 grpc_core::ContextList* cl = nullptr;
494 grpc_core::RefCountedPtr<grpc_core::channelz::SocketNode> channelz_socket;
495 uint32_t num_messages_in_next_write = 0;
496 /** The number of pending induced frames (SETTINGS_ACK, PINGS_ACK and
497 * RST_STREAM) in the outgoing buffer (t->qbuf). If this number goes beyond
498 * DEFAULT_MAX_PENDING_INDUCED_FRAMES, we pause reading new frames. We would
499 * only continue reading when we are able to write to the socket again,
500 * thereby reducing the number of induced frames. */
501 uint32_t num_pending_induced_frames = 0;
502 bool reading_paused_on_pending_induced_frames = false;
506 GRPC_METADATA_NOT_PUBLISHED,
507 GRPC_METADATA_SYNTHESIZED_FROM_FAKE,
508 GRPC_METADATA_PUBLISHED_FROM_WIRE,
509 GRPC_METADATA_PUBLISHED_AT_CLOSE
510 } grpc_published_metadata_method;
512 struct grpc_chttp2_stream {
513 grpc_chttp2_stream(grpc_chttp2_transport* t, grpc_stream_refcount* refcount,
514 const void* server_data, grpc_core::Arena* arena);
515 ~grpc_chttp2_stream();
518 grpc_chttp2_transport* t;
519 grpc_stream_refcount* refcount;
520 // Reffer is a 0-len structure, simply reffing `t` and `refcount` in its ctor
521 // before initializing the rest of the stream, to avoid cache misses. This
522 // field MUST be right after `t` and `refcount`.
524 explicit Reffer(grpc_chttp2_stream* s);
527 grpc_closure destroy_stream;
528 grpc_closure* destroy_stream_arg;
530 grpc_chttp2_stream_link links[STREAM_LIST_COUNT];
531 uint8_t included[STREAM_LIST_COUNT] = {};
533 /** HTTP2 stream id for this stream, or zero if one has not been assigned */
536 /** things the upper layers would like to send */
537 grpc_metadata_batch* send_initial_metadata = nullptr;
538 grpc_closure* send_initial_metadata_finished = nullptr;
539 grpc_metadata_batch* send_trailing_metadata = nullptr;
540 grpc_closure* send_trailing_metadata_finished = nullptr;
542 grpc_core::OrphanablePtr<grpc_core::ByteStream> fetching_send_message;
543 uint32_t fetched_send_message_length = 0;
544 grpc_slice fetching_slice = grpc_empty_slice();
545 int64_t next_message_end_offset;
546 int64_t flow_controlled_bytes_written = 0;
547 int64_t flow_controlled_bytes_flowed = 0;
548 grpc_closure complete_fetch_locked;
549 grpc_closure* fetching_send_message_finished = nullptr;
551 grpc_metadata_batch* recv_initial_metadata;
552 grpc_closure* recv_initial_metadata_ready = nullptr;
553 bool* trailing_metadata_available = nullptr;
554 grpc_core::OrphanablePtr<grpc_core::ByteStream>* recv_message;
555 grpc_closure* recv_message_ready = nullptr;
556 grpc_metadata_batch* recv_trailing_metadata;
557 grpc_closure* recv_trailing_metadata_finished = nullptr;
559 grpc_transport_stream_stats* collecting_stats = nullptr;
560 grpc_transport_stream_stats stats = grpc_transport_stream_stats();
562 /** Is this stream closed for writing. */
563 bool write_closed = false;
564 /** Is this stream reading half-closed. */
565 bool read_closed = false;
566 /** Are all published incoming byte streams closed. */
567 bool all_incoming_byte_streams_finished = false;
568 /** Has this stream seen an error.
569 If true, then pending incoming frames can be thrown away. */
570 bool seen_error = false;
571 /** Are we buffering writes on this stream? If yes, we won't become writable
572 until there's enough queued up in the flow_controlled_buffer */
573 bool write_buffering = false;
575 /* have we sent or received the EOS bit? */
576 bool eos_received = false;
577 bool eos_sent = false;
579 /** the error that resulted in this stream being read-closed */
580 grpc_error* read_closed_error = GRPC_ERROR_NONE;
581 /** the error that resulted in this stream being write-closed */
582 grpc_error* write_closed_error = GRPC_ERROR_NONE;
584 grpc_published_metadata_method published_metadata[2] = {};
585 bool final_metadata_requested = false;
587 grpc_chttp2_incoming_metadata_buffer metadata_buffer[2];
589 grpc_slice_buffer frame_storage; /* protected by t combiner */
591 grpc_closure* on_next = nullptr; /* protected by t combiner */
592 bool pending_byte_stream = false; /* protected by t combiner */
593 // cached length of buffer to be used by the transport thread in cases where
594 // stream->pending_byte_stream == true. The value is saved before
595 // application threads are allowed to modify
596 // unprocessed_incoming_frames_buffer
597 size_t unprocessed_incoming_frames_buffer_cached_length = 0;
598 /* Accessed only by transport thread when stream->pending_byte_stream == false
599 * Accessed only by application thread when stream->pending_byte_stream ==
601 grpc_slice_buffer unprocessed_incoming_frames_buffer;
602 grpc_closure reset_byte_stream;
603 grpc_error* byte_stream_error = GRPC_ERROR_NONE; /* protected by t combiner */
604 bool received_last_frame = false; /* protected by t combiner */
606 grpc_millis deadline = GRPC_MILLIS_INF_FUTURE;
608 /** saw some stream level error */
609 grpc_error* forced_close_error = GRPC_ERROR_NONE;
610 /** how many header frames have we received? */
611 uint8_t header_frames_received = 0;
612 /** parsing state for data frames */
613 /* Accessed only by transport thread when stream->pending_byte_stream == false
614 * Accessed only by application thread when stream->pending_byte_stream ==
616 grpc_chttp2_data_parser data_parser;
617 /** number of bytes received - reset at end of parse thread execution */
618 int64_t received_bytes = 0;
620 bool sent_initial_metadata = false;
621 bool sent_trailing_metadata = false;
623 grpc_core::PolymorphicManualConstructor<
624 grpc_core::chttp2::StreamFlowControlBase,
625 grpc_core::chttp2::StreamFlowControl,
626 grpc_core::chttp2::StreamFlowControlDisabled>
629 grpc_slice_buffer flow_controlled_buffer;
631 grpc_chttp2_write_cb* on_flow_controlled_cbs = nullptr;
632 grpc_chttp2_write_cb* on_write_finished_cbs = nullptr;
633 grpc_chttp2_write_cb* finish_after_write = nullptr;
634 size_t sending_bytes = 0;
636 /* Stream compression method to be used. */
637 grpc_stream_compression_method stream_compression_method =
638 GRPC_STREAM_COMPRESSION_IDENTITY_COMPRESS;
639 /* Stream decompression method to be used. */
640 grpc_stream_compression_method stream_decompression_method =
641 GRPC_STREAM_COMPRESSION_IDENTITY_DECOMPRESS;
643 /** Whether bytes stored in unprocessed_incoming_byte_stream is decompressed
645 bool unprocessed_incoming_frames_decompressed = false;
646 /** Whether the bytes needs to be traced using Fathom */
648 /** gRPC header bytes that are already decompressed */
649 size_t decompressed_header_bytes = 0;
650 /** Byte counter for number of bytes written */
651 size_t byte_counter = 0;
653 /** Amount of uncompressed bytes sent out when compressed_data_buffer is
655 size_t uncompressed_data_size;
656 /** Stream compression compress context */
657 grpc_stream_compression_context* stream_compression_ctx;
658 /** Buffer storing data that is compressed but not sent */
659 grpc_slice_buffer compressed_data_buffer;
661 /** Stream compression decompress context */
662 grpc_stream_compression_context* stream_decompression_ctx;
663 /** Temporary buffer storing decompressed data.
664 * Initialized, used, and destroyed only when stream uses (non-identity)
667 grpc_slice_buffer decompressed_data_buffer;
670 /** Transport writing call flow:
671 grpc_chttp2_initiate_write() is called anywhere that we know bytes need to
673 If no other write has been started, a task is enqueued onto our workqueue.
674 When that task executes, it obtains the global lock, and gathers the data
676 The global lock is dropped and we do the syscall to write.
677 After writing, a follow-up check is made to see if another round of writing
680 The actual call chain is documented in the implementation of this function.
682 void grpc_chttp2_initiate_write(grpc_chttp2_transport* t,
683 grpc_chttp2_initiate_write_reason reason);
686 /** are we writing? */
688 /** if writing: was it a complete flush (false) or a partial flush (true) */
690 /** did we queue any completions as part of beginning the write */
691 bool early_results_scheduled;
692 } grpc_chttp2_begin_write_result;
694 grpc_chttp2_begin_write_result grpc_chttp2_begin_write(
695 grpc_chttp2_transport* t);
696 void grpc_chttp2_end_write(grpc_chttp2_transport* t, grpc_error* error);
698 /** Process one slice of incoming data; return 1 if the connection is still
699 viable after reading, or 0 if the connection should be torn down */
700 grpc_error* grpc_chttp2_perform_read(grpc_chttp2_transport* t,
701 const grpc_slice& slice);
703 bool grpc_chttp2_list_add_writable_stream(grpc_chttp2_transport* t,
704 grpc_chttp2_stream* s);
705 /** Get a writable stream
706 returns non-zero if there was a stream available */
707 bool grpc_chttp2_list_pop_writable_stream(grpc_chttp2_transport* t,
708 grpc_chttp2_stream** s);
709 bool grpc_chttp2_list_remove_writable_stream(grpc_chttp2_transport* t,
710 grpc_chttp2_stream* s);
712 bool grpc_chttp2_list_add_writing_stream(grpc_chttp2_transport* t,
713 grpc_chttp2_stream* s);
714 bool grpc_chttp2_list_have_writing_streams(grpc_chttp2_transport* t);
715 bool grpc_chttp2_list_pop_writing_stream(grpc_chttp2_transport* t,
716 grpc_chttp2_stream** s);
718 void grpc_chttp2_list_add_written_stream(grpc_chttp2_transport* t,
719 grpc_chttp2_stream* s);
720 bool grpc_chttp2_list_pop_written_stream(grpc_chttp2_transport* t,
721 grpc_chttp2_stream** s);
723 void grpc_chttp2_list_add_waiting_for_concurrency(grpc_chttp2_transport* t,
724 grpc_chttp2_stream* s);
725 bool grpc_chttp2_list_pop_waiting_for_concurrency(grpc_chttp2_transport* t,
726 grpc_chttp2_stream** s);
727 void grpc_chttp2_list_remove_waiting_for_concurrency(grpc_chttp2_transport* t,
728 grpc_chttp2_stream* s);
730 void grpc_chttp2_list_add_stalled_by_transport(grpc_chttp2_transport* t,
731 grpc_chttp2_stream* s);
732 bool grpc_chttp2_list_pop_stalled_by_transport(grpc_chttp2_transport* t,
733 grpc_chttp2_stream** s);
734 void grpc_chttp2_list_remove_stalled_by_transport(grpc_chttp2_transport* t,
735 grpc_chttp2_stream* s);
737 void grpc_chttp2_list_add_stalled_by_stream(grpc_chttp2_transport* t,
738 grpc_chttp2_stream* s);
739 bool grpc_chttp2_list_pop_stalled_by_stream(grpc_chttp2_transport* t,
740 grpc_chttp2_stream** s);
741 bool grpc_chttp2_list_remove_stalled_by_stream(grpc_chttp2_transport* t,
742 grpc_chttp2_stream* s);
744 /********* Flow Control ***************/
746 // Takes in a flow control action and performs all the needed operations.
747 void grpc_chttp2_act_on_flowctl_action(
748 const grpc_core::chttp2::FlowControlAction& action,
749 grpc_chttp2_transport* t, grpc_chttp2_stream* s);
751 /********* End of Flow Control ***************/
753 inline grpc_chttp2_stream* grpc_chttp2_parsing_lookup_stream(
754 grpc_chttp2_transport* t, uint32_t id) {
755 return static_cast<grpc_chttp2_stream*>(
756 grpc_chttp2_stream_map_find(&t->stream_map, id));
758 grpc_chttp2_stream* grpc_chttp2_parsing_accept_stream(grpc_chttp2_transport* t,
761 void grpc_chttp2_add_incoming_goaway(grpc_chttp2_transport* t,
762 uint32_t goaway_error,
763 uint32_t last_stream_id,
764 const grpc_slice& goaway_text);
766 void grpc_chttp2_parsing_become_skip_parser(grpc_chttp2_transport* t);
768 void grpc_chttp2_complete_closure_step(grpc_chttp2_transport* t,
769 grpc_chttp2_stream* s,
770 grpc_closure** pclosure,
771 grpc_error* error, const char* desc);
773 #define GRPC_HEADER_SIZE_IN_BYTES 5
774 #define MAX_SIZE_T (~(size_t)0)
776 #define GRPC_CHTTP2_CLIENT_CONNECT_STRING "PRI * HTTP/2.0\r\n\r\nSM\r\n\r\n"
777 #define GRPC_CHTTP2_CLIENT_CONNECT_STRLEN \
778 (sizeof(GRPC_CHTTP2_CLIENT_CONNECT_STRING) - 1)
780 // extern grpc_core::TraceFlag grpc_http_trace;
781 // extern grpc_core::TraceFlag grpc_flowctl_trace;
783 #define GRPC_CHTTP2_IF_TRACING(stmt) \
785 if (GRPC_TRACE_FLAG_ENABLED(grpc_http_trace)) { \
790 void grpc_chttp2_fake_status(grpc_chttp2_transport* t,
791 grpc_chttp2_stream* stream, grpc_error* error);
792 void grpc_chttp2_mark_stream_closed(grpc_chttp2_transport* t,
793 grpc_chttp2_stream* s, int close_reads,
794 int close_writes, grpc_error* error);
795 void grpc_chttp2_start_writing(grpc_chttp2_transport* t);
798 #define GRPC_CHTTP2_STREAM_REF(stream, reason) \
799 grpc_chttp2_stream_ref(stream, reason)
800 #define GRPC_CHTTP2_STREAM_UNREF(stream, reason) \
801 grpc_chttp2_stream_unref(stream, reason)
802 void grpc_chttp2_stream_ref(grpc_chttp2_stream* s, const char* reason);
803 void grpc_chttp2_stream_unref(grpc_chttp2_stream* s, const char* reason);
805 #define GRPC_CHTTP2_STREAM_REF(stream, reason) grpc_chttp2_stream_ref(stream)
806 #define GRPC_CHTTP2_STREAM_UNREF(stream, reason) \
807 grpc_chttp2_stream_unref(stream)
808 void grpc_chttp2_stream_ref(grpc_chttp2_stream* s);
809 void grpc_chttp2_stream_unref(grpc_chttp2_stream* s);
813 #define GRPC_CHTTP2_REF_TRANSPORT(t, r) \
814 grpc_chttp2_ref_transport(t, r, __FILE__, __LINE__)
815 #define GRPC_CHTTP2_UNREF_TRANSPORT(t, r) \
816 grpc_chttp2_unref_transport(t, r, __FILE__, __LINE__)
817 inline void grpc_chttp2_unref_transport(grpc_chttp2_transport* t,
818 const char* reason, const char* file,
820 if (t->refs.Unref(grpc_core::DebugLocation(file, line), reason)) {
821 grpc_core::Delete(t);
824 inline void grpc_chttp2_ref_transport(grpc_chttp2_transport* t,
825 const char* reason, const char* file,
827 t->refs.Ref(grpc_core::DebugLocation(file, line), reason);
830 #define GRPC_CHTTP2_REF_TRANSPORT(t, r) grpc_chttp2_ref_transport(t)
831 #define GRPC_CHTTP2_UNREF_TRANSPORT(t, r) grpc_chttp2_unref_transport(t)
832 inline void grpc_chttp2_unref_transport(grpc_chttp2_transport* t) {
833 if (t->refs.Unref()) {
834 grpc_core::Delete(t);
837 inline void grpc_chttp2_ref_transport(grpc_chttp2_transport* t) {
842 void grpc_chttp2_ack_ping(grpc_chttp2_transport* t, uint64_t id);
844 /** Add a new ping strike to ping_recv_state.ping_strikes. If
845 ping_recv_state.ping_strikes > ping_policy.max_ping_strikes, it sends GOAWAY
846 with error code ENHANCE_YOUR_CALM and additional debug data resembling
847 "too_many_pings" followed by immediately closing the connection. */
848 void grpc_chttp2_add_ping_strike(grpc_chttp2_transport* t);
850 /** add a ref to the stream and add it to the writable list;
851 ref will be dropped in writing.c */
852 void grpc_chttp2_mark_stream_writable(grpc_chttp2_transport* t,
853 grpc_chttp2_stream* s);
855 void grpc_chttp2_cancel_stream(grpc_chttp2_transport* t, grpc_chttp2_stream* s,
856 grpc_error* due_to_error);
858 void grpc_chttp2_maybe_complete_recv_initial_metadata(grpc_chttp2_transport* t,
859 grpc_chttp2_stream* s);
860 void grpc_chttp2_maybe_complete_recv_message(grpc_chttp2_transport* t,
861 grpc_chttp2_stream* s);
862 void grpc_chttp2_maybe_complete_recv_trailing_metadata(grpc_chttp2_transport* t,
863 grpc_chttp2_stream* s);
865 void grpc_chttp2_fail_pending_writes(grpc_chttp2_transport* t,
866 grpc_chttp2_stream* s, grpc_error* error);
868 /** Set the default keepalive configurations, must only be called at
870 void grpc_chttp2_config_default_keepalive_args(grpc_channel_args* args,
873 #endif /* GRPC_CORE_EXT_TRANSPORT_CHTTP2_TRANSPORT_INTERNAL_H */