3 * Copyright 2015 gRPC authors.
5 * Licensed under the Apache License, Version 2.0 (the "License");
6 * you may not use this file except in compliance with the License.
7 * You may obtain a copy of the License at
9 * http://www.apache.org/licenses/LICENSE-2.0
11 * Unless required by applicable law or agreed to in writing, software
12 * distributed under the License is distributed on an "AS IS" BASIS,
13 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14 * See the License for the specific language governing permissions and
15 * limitations under the License.
19 #ifndef GRPC_CORE_LIB_TRANSPORT_TRANSPORT_H
20 #define GRPC_CORE_LIB_TRANSPORT_TRANSPORT_H
22 #include <grpc/support/port_platform.h>
26 #include "src/core/lib/channel/context.h"
27 #include "src/core/lib/gprpp/arena.h"
28 #include "src/core/lib/iomgr/call_combiner.h"
29 #include "src/core/lib/iomgr/endpoint.h"
30 #include "src/core/lib/iomgr/polling_entity.h"
31 #include "src/core/lib/iomgr/pollset.h"
32 #include "src/core/lib/iomgr/pollset_set.h"
33 #include "src/core/lib/slice/slice_internal.h"
34 #include "src/core/lib/transport/byte_stream.h"
35 #include "src/core/lib/transport/metadata_batch.h"
37 /* Minimum and maximum protocol accepted versions. */
38 #define GRPC_PROTOCOL_VERSION_MAX_MAJOR 2
39 #define GRPC_PROTOCOL_VERSION_MAX_MINOR 1
40 #define GRPC_PROTOCOL_VERSION_MIN_MAJOR 2
41 #define GRPC_PROTOCOL_VERSION_MIN_MINOR 1
43 /* forward declarations */
45 typedef struct grpc_transport grpc_transport;
47 /* grpc_stream doesn't actually exist. It's used as a typesafe
48 opaque pointer for whatever data the transport wants to track
50 typedef struct grpc_stream grpc_stream;
52 extern grpc_core::DebugOnlyTraceFlag grpc_trace_stream_refcount;
54 typedef struct grpc_stream_refcount {
55 grpc_core::RefCount refs;
58 const char* object_type;
60 grpc_slice_refcount slice_refcount;
61 } grpc_stream_refcount;
64 void grpc_stream_ref_init(grpc_stream_refcount* refcount, int initial_refs,
65 grpc_iomgr_cb_func cb, void* cb_arg,
66 const char* object_type);
67 #define GRPC_STREAM_REF_INIT(rc, ir, cb, cb_arg, objtype) \
68 grpc_stream_ref_init(rc, ir, cb, cb_arg, objtype)
70 void grpc_stream_ref_init(grpc_stream_refcount* refcount, int initial_refs,
71 grpc_iomgr_cb_func cb, void* cb_arg);
72 #define GRPC_STREAM_REF_INIT(rc, ir, cb, cb_arg, objtype) \
73 grpc_stream_ref_init(rc, ir, cb, cb_arg)
77 inline void grpc_stream_ref(grpc_stream_refcount* refcount,
79 if (grpc_trace_stream_refcount.enabled()) {
80 gpr_log(GPR_DEBUG, "%s %p:%p REF %s", refcount->object_type, refcount,
81 refcount->destroy.cb_arg, reason);
83 refcount->refs.RefNonZero(DEBUG_LOCATION, reason);
86 inline void grpc_stream_ref(grpc_stream_refcount* refcount) {
87 refcount->refs.RefNonZero();
91 void grpc_stream_destroy(grpc_stream_refcount* refcount);
94 inline void grpc_stream_unref(grpc_stream_refcount* refcount,
96 if (grpc_trace_stream_refcount.enabled()) {
97 gpr_log(GPR_DEBUG, "%s %p:%p UNREF %s", refcount->object_type, refcount,
98 refcount->destroy.cb_arg, reason);
100 if (GPR_UNLIKELY(refcount->refs.Unref(DEBUG_LOCATION, reason))) {
101 grpc_stream_destroy(refcount);
105 inline void grpc_stream_unref(grpc_stream_refcount* refcount) {
106 if (GPR_UNLIKELY(refcount->refs.Unref())) {
107 grpc_stream_destroy(refcount);
112 /* Wrap a buffer that is owned by some stream object into a slice that shares
114 grpc_slice grpc_slice_from_stream_owned_buffer(grpc_stream_refcount* refcount,
115 void* buffer, size_t length);
117 struct grpc_transport_one_way_stats {
118 uint64_t framing_bytes = 0;
119 uint64_t data_bytes = 0;
120 uint64_t header_bytes = 0;
123 struct grpc_transport_stream_stats {
124 grpc_transport_one_way_stats incoming;
125 grpc_transport_one_way_stats outgoing;
128 void grpc_transport_move_one_way_stats(grpc_transport_one_way_stats* from,
129 grpc_transport_one_way_stats* to);
131 void grpc_transport_move_stats(grpc_transport_stream_stats* from,
132 grpc_transport_stream_stats* to);
134 // This struct (which is present in both grpc_transport_stream_op_batch
135 // and grpc_transport_op_batch) is a convenience to allow filters or
136 // transports to schedule a closure related to a particular batch without
137 // having to allocate memory. The general pattern is to initialize the
138 // closure with the callback arg set to the batch and extra_arg set to
139 // whatever state is associated with the handler (e.g., the call element
140 // or the transport stream object).
142 // Note that this can only be used by the current handler of a given
143 // batch on the way down the stack (i.e., whichever filter or transport is
144 // currently handling the batch). Once a filter or transport passes control
145 // of the batch to the next handler, it cannot depend on the contents of
146 // this struct anymore, because the next handler may reuse it.
147 struct grpc_handler_private_op_data {
148 void* extra_arg = nullptr;
149 grpc_closure closure;
150 grpc_handler_private_op_data() { memset(&closure, 0, sizeof(closure)); }
153 typedef struct grpc_transport_stream_op_batch_payload
154 grpc_transport_stream_op_batch_payload;
156 /* Transport stream op: a set of operations to perform on a transport
157 against a single stream */
158 struct grpc_transport_stream_op_batch {
159 grpc_transport_stream_op_batch()
160 : send_initial_metadata(false),
161 send_trailing_metadata(false),
163 recv_initial_metadata(false),
165 recv_trailing_metadata(false),
166 cancel_stream(false),
169 /** Should be scheduled when all of the non-recv operations in the batch
172 The recv ops (recv_initial_metadata, recv_message, and
173 recv_trailing_metadata) each have their own callbacks. If a batch
174 contains both recv ops and non-recv ops, on_complete should be
175 scheduled as soon as the non-recv ops are complete, regardless of
176 whether or not the recv ops are complete. If a batch contains
177 only recv ops, on_complete can be null. */
178 grpc_closure* on_complete = nullptr;
180 /** Values for the stream op (fields set are determined by flags above) */
181 grpc_transport_stream_op_batch_payload* payload = nullptr;
183 /** Send initial metadata to the peer, from the provided metadata batch. */
184 bool send_initial_metadata : 1;
186 /** Send trailing metadata to the peer, from the provided metadata batch. */
187 bool send_trailing_metadata : 1;
189 /** Send message data to the peer, from the provided byte stream. */
190 bool send_message : 1;
192 /** Receive initial metadata from the stream, into provided metadata batch. */
193 bool recv_initial_metadata : 1;
195 /** Receive message data from the stream, into provided byte stream. */
196 bool recv_message : 1;
198 /** Receive trailing metadata from the stream, into provided metadata batch.
200 bool recv_trailing_metadata : 1;
202 /** Cancel this stream with the provided error */
203 bool cancel_stream : 1;
205 /** Is this stream traced */
208 /***************************************************************************
209 * remaining fields are initialized and used at the discretion of the
210 * current handler of the op */
212 grpc_handler_private_op_data handler_private;
215 struct grpc_transport_stream_op_batch_payload {
216 explicit grpc_transport_stream_op_batch_payload(
217 grpc_call_context_element* context)
218 : context(context) {}
219 ~grpc_transport_stream_op_batch_payload() {
220 // We don't really own `send_message`, so release ownership and let the
221 // owner clean the data.
222 send_message.send_message.release();
226 grpc_metadata_batch* send_initial_metadata = nullptr;
227 /** Iff send_initial_metadata != NULL, flags associated with
228 send_initial_metadata: a bitfield of GRPC_INITIAL_METADATA_xxx */
229 uint32_t send_initial_metadata_flags = 0;
230 // If non-NULL, will be set by the transport to the peer string (a char*).
231 // The transport retains ownership of the string.
232 // Note: This pointer may be used by the transport after the
233 // send_initial_metadata op is completed. It must remain valid
234 // until the call is destroyed.
235 gpr_atm* peer_string = nullptr;
236 } send_initial_metadata;
239 grpc_metadata_batch* send_trailing_metadata = nullptr;
240 } send_trailing_metadata;
243 // The transport (or a filter that decides to return a failure before
244 // the op gets down to the transport) takes ownership.
245 // The batch's on_complete will not be called until after the byte
246 // stream is orphaned.
247 grpc_core::OrphanablePtr<grpc_core::ByteStream> send_message;
248 // Set by the transport if the stream has been closed for writes. If this
249 // is set and send message op is present, we set the operation to be a
250 // failure without sending a cancel OP down the stack. This is so that the
251 // status of the call does not get overwritten by the Cancel OP, which would
252 // be especially problematic if we had received a valid status from the
254 // For send_initial_metadata, it is fine for the status to be overwritten
255 // because at that point, the client will not have received a status.
256 // For send_trailing_metadata, we might overwrite the status if we have
257 // non-zero metadata to send. This is fine because the API does not allow
258 // the client to send trailing metadata.
259 bool stream_write_closed = false;
263 grpc_metadata_batch* recv_initial_metadata = nullptr;
264 // Flags are used only on the server side. If non-null, will be set to
265 // a bitfield of the GRPC_INITIAL_METADATA_xxx macros (e.g., to
266 // indicate if the call is idempotent).
267 uint32_t* recv_flags = nullptr;
268 /** Should be enqueued when initial metadata is ready to be processed. */
269 grpc_closure* recv_initial_metadata_ready = nullptr;
270 // If not NULL, will be set to true if trailing metadata is
271 // immediately available. This may be a signal that we received a
272 // Trailers-Only response.
273 bool* trailing_metadata_available = nullptr;
274 // If non-NULL, will be set by the transport to the peer string (a char*).
275 // The transport retains ownership of the string.
276 // Note: This pointer may be used by the transport after the
277 // recv_initial_metadata op is completed. It must remain valid
278 // until the call is destroyed.
279 gpr_atm* peer_string = nullptr;
280 } recv_initial_metadata;
283 // Will be set by the transport to point to the byte stream
284 // containing a received message.
285 // Will be NULL if trailing metadata is received instead of a message.
286 grpc_core::OrphanablePtr<grpc_core::ByteStream>* recv_message = nullptr;
287 /** Should be enqueued when one message is ready to be processed. */
288 grpc_closure* recv_message_ready = nullptr;
292 grpc_metadata_batch* recv_trailing_metadata = nullptr;
293 grpc_transport_stream_stats* collect_stats = nullptr;
294 /** Should be enqueued when initial metadata is ready to be processed. */
295 grpc_closure* recv_trailing_metadata_ready = nullptr;
296 } recv_trailing_metadata;
298 /** Forcefully close this stream.
299 The HTTP2 semantics should be:
300 - server side: if cancel_error has GRPC_ERROR_INT_GRPC_STATUS, and
301 trailing metadata has not been sent, send trailing metadata with status
302 and message from cancel_error (use grpc_error_get_status) followed by
303 a RST_STREAM with error=GRPC_CHTTP2_NO_ERROR to force a full close
304 - at all other times: use grpc_error_get_status to get a status code, and
305 convert to a HTTP2 error code using
306 grpc_chttp2_grpc_status_to_http2_error. Send a RST_STREAM with this
309 // Error contract: the transport that gets this op must cause cancel_error
310 // to be unref'ed after processing it
311 grpc_error* cancel_error = GRPC_ERROR_NONE;
314 /* Indexes correspond to grpc_context_index enum values */
315 grpc_call_context_element* context;
318 /** Transport op: a set of operations to perform on a transport as a whole */
319 typedef struct grpc_transport_op {
320 /** Called when processing of this op is done. */
321 grpc_closure* on_consumed = nullptr;
322 /** connectivity monitoring - set connectivity_state to NULL to unsubscribe */
323 grpc_closure* on_connectivity_state_change = nullptr;
324 grpc_connectivity_state* connectivity_state = nullptr;
325 /** should the transport be disconnected
326 * Error contract: the transport that gets this op must cause
327 * disconnect_with_error to be unref'ed after processing it */
328 grpc_error* disconnect_with_error = nullptr;
329 /** what should the goaway contain?
330 * Error contract: the transport that gets this op must cause
331 * goaway_error to be unref'ed after processing it */
332 grpc_error* goaway_error = nullptr;
333 /** set the callback for accepting new streams;
334 this is a permanent callback, unlike the other one-shot closures.
335 If true, the callback is set to set_accept_stream_fn, with its
336 user_data argument set to set_accept_stream_user_data */
337 bool set_accept_stream = false;
338 void (*set_accept_stream_fn)(void* user_data, grpc_transport* transport,
339 const void* server_data) = nullptr;
340 void* set_accept_stream_user_data = nullptr;
341 /** add this transport to a pollset */
342 grpc_pollset* bind_pollset = nullptr;
343 /** add this transport to a pollset_set */
344 grpc_pollset_set* bind_pollset_set = nullptr;
345 /** send a ping, if either on_initiate or on_ack is not NULL */
347 /** Ping may be delayed by the transport, on_initiate callback will be
348 called when the ping is actually being sent. */
349 grpc_closure* on_initiate = nullptr;
350 /** Called when the ping ack is received */
351 grpc_closure* on_ack = nullptr;
353 // If true, will reset the channel's connection backoff.
354 bool reset_connect_backoff = false;
356 /***************************************************************************
357 * remaining fields are initialized and used at the discretion of the
358 * transport implementation */
360 grpc_handler_private_op_data handler_private;
363 /* Returns the amount of memory required to store a grpc_stream for this
365 size_t grpc_transport_stream_size(grpc_transport* transport);
367 /* Initialize transport data for a stream.
369 Returns 0 on success, any other (transport-defined) value for failure.
370 May assume that stream contains all-zeros.
373 transport - the transport on which to create this stream
374 stream - a pointer to uninitialized memory to initialize
375 server_data - either NULL for a client initiated stream, or a pointer
376 supplied from the accept_stream callback function */
377 int grpc_transport_init_stream(grpc_transport* transport, grpc_stream* stream,
378 grpc_stream_refcount* refcount,
379 const void* server_data,
380 grpc_core::Arena* arena);
382 void grpc_transport_set_pops(grpc_transport* transport, grpc_stream* stream,
383 grpc_polling_entity* pollent);
385 /* Destroy transport data for a stream.
387 Requires: a recv_batch with final_state == GRPC_STREAM_CLOSED has been
388 received by the up-layer. Must not be called in the same call stack as
392 transport - the transport on which to create this stream
393 stream - the grpc_stream to destroy (memory is still owned by the
394 caller, but any child memory must be cleaned up) */
395 void grpc_transport_destroy_stream(grpc_transport* transport,
397 grpc_closure* then_schedule_closure);
399 void grpc_transport_stream_op_batch_finish_with_failure(
400 grpc_transport_stream_op_batch* op, grpc_error* error,
401 grpc_core::CallCombiner* call_combiner);
403 char* grpc_transport_stream_op_batch_string(grpc_transport_stream_op_batch* op);
404 char* grpc_transport_op_string(grpc_transport_op* op);
406 /* Send a batch of operations on a transport
408 Takes ownership of any objects contained in ops.
411 transport - the transport on which to initiate the stream
412 stream - the stream on which to send the operations. This must be
413 non-NULL and previously initialized by the same transport.
414 op - a grpc_transport_stream_op_batch specifying the op to perform
416 void grpc_transport_perform_stream_op(grpc_transport* transport,
418 grpc_transport_stream_op_batch* op);
420 void grpc_transport_perform_op(grpc_transport* transport,
421 grpc_transport_op* op);
423 /* Send a ping on a transport
425 Calls cb with user data when a response is received. */
426 void grpc_transport_ping(grpc_transport* transport, grpc_closure* cb);
428 /* Advise peer of pending connection termination. */
429 void grpc_transport_goaway(grpc_transport* transport, grpc_status_code status,
430 grpc_slice debug_data);
432 /* Destroy the transport */
433 void grpc_transport_destroy(grpc_transport* transport);
435 /* Get the endpoint used by \a transport */
436 grpc_endpoint* grpc_transport_get_endpoint(grpc_transport* transport);
438 /* Allocate a grpc_transport_op, and preconfigure the on_consumed closure to
439 \a on_consumed and then delete the returned transport op */
440 grpc_transport_op* grpc_make_transport_op(grpc_closure* on_consumed);
441 /* Allocate a grpc_transport_stream_op_batch, and preconfigure the on_consumed
443 to \a on_consumed and then delete the returned transport op */
444 grpc_transport_stream_op_batch* grpc_make_transport_stream_op(
445 grpc_closure* on_consumed);
447 #endif /* GRPC_CORE_LIB_TRANSPORT_TRANSPORT_H */