3 * Copyright 2016 gRPC authors.
5 * Licensed under the Apache License, Version 2.0 (the "License");
6 * you may not use this file except in compliance with the License.
7 * You may obtain a copy of the License at
9 * http://www.apache.org/licenses/LICENSE-2.0
11 * Unless required by applicable law or agreed to in writing, software
12 * distributed under the License is distributed on an "AS IS" BASIS,
13 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14 * See the License for the specific language governing permissions and
15 * limitations under the License.
19 #include <grpc/support/port_platform.h>
23 #include <grpc/slice_buffer.h>
24 #include <grpc/support/alloc.h>
25 #include <grpc/support/log.h>
26 #include <grpc/support/string_util.h>
28 #include "src/core/ext/transport/chttp2/transport/bin_decoder.h"
29 #include "src/core/ext/transport/chttp2/transport/bin_encoder.h"
30 #include "src/core/ext/transport/chttp2/transport/incoming_metadata.h"
31 #include "src/core/ext/transport/cronet/transport/cronet_transport.h"
32 #include "src/core/lib/debug/trace.h"
33 #include "src/core/lib/gpr/string.h"
34 #include "src/core/lib/gprpp/manual_constructor.h"
35 #include "src/core/lib/iomgr/endpoint.h"
36 #include "src/core/lib/iomgr/exec_ctx.h"
37 #include "src/core/lib/slice/slice_internal.h"
38 #include "src/core/lib/slice/slice_string_helpers.h"
39 #include "src/core/lib/surface/channel.h"
40 #include "src/core/lib/surface/validate_metadata.h"
41 #include "src/core/lib/transport/metadata_batch.h"
42 #include "src/core/lib/transport/static_metadata.h"
43 #include "src/core/lib/transport/timeout_encoding.h"
44 #include "src/core/lib/transport/transport_impl.h"
46 #include "third_party/objective_c/Cronet/bidirectional_stream_c.h"
48 #define GRPC_HEADER_SIZE_IN_BYTES 5
49 #define GRPC_FLUSH_READ_SIZE 4096
51 grpc_core::TraceFlag grpc_cronet_trace(false, "cronet");
52 #define CRONET_LOG(...) \
54 if (grpc_cronet_trace.enabled()) gpr_log(__VA_ARGS__); \
58 ACTION_TAKEN_WITH_CALLBACK,
59 ACTION_TAKEN_NO_CALLBACK,
64 OP_SEND_INITIAL_METADATA = 0,
66 OP_SEND_TRAILING_METADATA,
68 OP_RECV_INITIAL_METADATA,
69 OP_RECV_TRAILING_METADATA,
75 OP_RECV_MESSAGE_AND_ON_COMPLETE,
80 /* Cronet callbacks. See cronet_c_for_grpc.h for documentation for each. */
82 static void on_stream_ready(bidirectional_stream*);
83 static void on_response_headers_received(
84 bidirectional_stream*, const bidirectional_stream_header_array*,
86 static void on_write_completed(bidirectional_stream*, const char*);
87 static void on_read_completed(bidirectional_stream*, char*, int);
88 static void on_response_trailers_received(
89 bidirectional_stream*, const bidirectional_stream_header_array*);
90 static void on_succeeded(bidirectional_stream*);
91 static void on_failed(bidirectional_stream*, int);
92 static void on_canceled(bidirectional_stream*);
93 static bidirectional_stream_callback cronet_callbacks = {
95 on_response_headers_received,
98 on_response_trailers_received,
103 /* Cronet transport object */
104 struct grpc_cronet_transport {
105 grpc_transport base; /* must be first element in this structure */
106 stream_engine* engine;
108 bool use_packet_coalescing;
110 typedef struct grpc_cronet_transport grpc_cronet_transport;
112 /* TODO (makdharma): reorder structure for memory efficiency per
113 http://www.catb.org/esr/structure-packing/#_structure_reordering: */
115 read_state(grpc_core::Arena* arena)
116 : trailing_metadata(arena), initial_metadata(arena) {
117 grpc_slice_buffer_init(&read_slice_buffer);
120 /* vars to store data coming from server */
121 char* read_buffer = nullptr;
122 bool length_field_received = false;
123 int received_bytes = 0;
124 int remaining_bytes = 0;
125 int length_field = 0;
127 char grpc_header_bytes[GRPC_HEADER_SIZE_IN_BYTES] = {};
128 char* payload_field = nullptr;
129 bool read_stream_closed = 0;
131 /* vars for holding data destined for the application */
132 grpc_core::ManualConstructor<grpc_core::SliceBufferByteStream> sbs;
133 grpc_slice_buffer read_slice_buffer;
135 /* vars for trailing metadata */
136 grpc_chttp2_incoming_metadata_buffer trailing_metadata;
137 bool trailing_metadata_valid = false;
139 /* vars for initial metadata */
140 grpc_chttp2_incoming_metadata_buffer initial_metadata;
144 char* write_buffer = nullptr;
147 /* track state of one stream op */
149 op_state(grpc_core::Arena* arena) : rs(arena) {}
151 bool state_op_done[OP_NUM_OPS] = {};
152 bool state_callback_received[OP_NUM_OPS] = {};
153 /* A non-zero gRPC status code has been seen */
154 bool fail_state = false;
155 /* Transport is discarding all buffered messages */
156 bool flush_read = false;
157 bool flush_cronet_when_ready = false;
158 bool pending_write_for_trailer = false;
159 bool pending_send_message = false;
160 /* User requested RECV_TRAILING_METADATA */
161 bool pending_recv_trailing_metadata = false;
162 /* Cronet has not issued a callback of a bidirectional read */
163 bool pending_read_from_cronet = false;
164 grpc_error* cancel_error = GRPC_ERROR_NONE;
165 /* data structure for storing data coming from server */
166 struct read_state rs;
167 /* data structure for storing data going to the server */
168 struct write_state ws;
173 struct op_and_state {
174 op_and_state(stream_obj* s, const grpc_transport_stream_op_batch& op);
176 grpc_transport_stream_op_batch op;
177 struct op_state state;
179 struct stream_obj* s; /* Pointer back to the stream object */
180 /* next op_and_state in the linked list */
181 struct op_and_state* next = nullptr;
185 int num_pending_ops = 0;
186 struct op_and_state* head = nullptr;
190 stream_obj(grpc_transport* gt, grpc_stream* gs,
191 grpc_stream_refcount* refcount, grpc_core::Arena* arena);
194 grpc_core::Arena* arena;
195 struct op_and_state* oas = nullptr;
196 grpc_transport_stream_op_batch* curr_op = nullptr;
197 grpc_cronet_transport* curr_ct;
198 grpc_stream* curr_gs;
199 bidirectional_stream* cbs = nullptr;
200 bidirectional_stream_header_array header_array =
201 bidirectional_stream_header_array(); // Zero-initialize the structure.
203 /* Stream level state. Some state will be tracked both at stream and stream_op
205 struct op_state state;
208 struct op_storage storage;
210 /* Mutex to protect storage */
213 /* Refcount object of the stream */
214 grpc_stream_refcount* refcount;
218 #define GRPC_CRONET_STREAM_REF(stream, reason) \
219 grpc_cronet_stream_ref((stream), (reason))
220 #define GRPC_CRONET_STREAM_UNREF(stream, reason) \
221 grpc_cronet_stream_unref((stream), (reason))
222 void grpc_cronet_stream_ref(stream_obj* s, const char* reason) {
223 grpc_stream_ref(s->refcount, reason);
225 void grpc_cronet_stream_unref(stream_obj* s, const char* reason) {
226 grpc_stream_unref(s->refcount, reason);
229 #define GRPC_CRONET_STREAM_REF(stream, reason) grpc_cronet_stream_ref((stream))
230 #define GRPC_CRONET_STREAM_UNREF(stream, reason) \
231 grpc_cronet_stream_unref((stream))
232 void grpc_cronet_stream_ref(stream_obj* s) { grpc_stream_ref(s->refcount); }
233 void grpc_cronet_stream_unref(stream_obj* s) { grpc_stream_unref(s->refcount); }
236 static enum e_op_result execute_stream_op(struct op_and_state* oas);
239 Utility function to translate enum into string for printing
241 static const char* op_result_string(enum e_op_result i) {
243 case ACTION_TAKEN_WITH_CALLBACK:
244 return "ACTION_TAKEN_WITH_CALLBACK";
245 case ACTION_TAKEN_NO_CALLBACK:
246 return "ACTION_TAKEN_NO_CALLBACK";
247 case NO_ACTION_POSSIBLE:
248 return "NO_ACTION_POSSIBLE";
250 GPR_UNREACHABLE_CODE(return "UNKNOWN");
253 static const char* op_id_string(enum e_op_id i) {
255 case OP_SEND_INITIAL_METADATA:
256 return "OP_SEND_INITIAL_METADATA";
257 case OP_SEND_MESSAGE:
258 return "OP_SEND_MESSAGE";
259 case OP_SEND_TRAILING_METADATA:
260 return "OP_SEND_TRAILING_METADATA";
261 case OP_RECV_MESSAGE:
262 return "OP_RECV_MESSAGE";
263 case OP_RECV_INITIAL_METADATA:
264 return "OP_RECV_INITIAL_METADATA";
265 case OP_RECV_TRAILING_METADATA:
266 return "OP_RECV_TRAILING_METADATA";
267 case OP_CANCEL_ERROR:
268 return "OP_CANCEL_ERROR";
270 return "OP_ON_COMPLETE";
274 return "OP_SUCCEEDED";
276 return "OP_CANCELED";
277 case OP_RECV_MESSAGE_AND_ON_COMPLETE:
278 return "OP_RECV_MESSAGE_AND_ON_COMPLETE";
279 case OP_READ_REQ_MADE:
280 return "OP_READ_REQ_MADE";
287 static void null_and_maybe_free_read_buffer(stream_obj* s) {
288 if (s->state.rs.read_buffer &&
289 s->state.rs.read_buffer != s->state.rs.grpc_header_bytes) {
290 gpr_free(s->state.rs.read_buffer);
292 s->state.rs.read_buffer = nullptr;
295 static void maybe_flush_read(stream_obj* s) {
296 /* To enter flush read state (discarding all the buffered messages in
297 * transport layer), two conditions must be satisfied: 1) non-zero grpc status
298 * has been received, and 2) an op requesting the status code
299 * (RECV_TRAILING_METADATA) is issued by the user. (See
300 * doc/status_ordering.md) */
301 /* Whenever the evaluation of any of the two condition is changed, we check
302 * whether we should enter the flush read state. */
303 if (s->state.pending_recv_trailing_metadata && s->state.fail_state) {
304 if (!s->state.flush_read && !s->state.rs.read_stream_closed) {
305 CRONET_LOG(GPR_DEBUG, "%p: Flush read", s);
306 s->state.flush_read = true;
307 null_and_maybe_free_read_buffer(s);
308 s->state.rs.read_buffer =
309 static_cast<char*>(gpr_malloc(GRPC_FLUSH_READ_SIZE));
310 if (!s->state.pending_read_from_cronet) {
311 CRONET_LOG(GPR_DEBUG, "bidirectional_stream_read(%p)", s->cbs);
312 bidirectional_stream_read(s->cbs, s->state.rs.read_buffer,
313 GRPC_FLUSH_READ_SIZE);
314 s->state.pending_read_from_cronet = true;
320 static grpc_error* make_error_with_desc(int error_code, const char* desc) {
321 grpc_error* error = GRPC_ERROR_CREATE_FROM_COPIED_STRING(desc);
322 error = grpc_error_set_int(error, GRPC_ERROR_INT_GRPC_STATUS, error_code);
326 inline op_and_state::op_and_state(stream_obj* s,
327 const grpc_transport_stream_op_batch& op)
328 : op(op), state(s->arena), s(s) {}
331 Add a new stream op to op storage.
333 static void add_to_storage(struct stream_obj* s,
334 grpc_transport_stream_op_batch* op) {
335 struct op_storage* storage = &s->storage;
336 /* add new op at the beginning of the linked list. The memory is freed
337 in remove_from_storage */
338 op_and_state* new_op = grpc_core::New<op_and_state>(s, *op);
340 new_op->next = storage->head;
341 storage->head = new_op;
342 storage->num_pending_ops++;
343 if (op->send_message) {
344 s->state.pending_send_message = true;
346 if (op->recv_trailing_metadata) {
347 s->state.pending_recv_trailing_metadata = true;
350 CRONET_LOG(GPR_DEBUG, "adding new op %p. %d in the queue.", new_op,
351 storage->num_pending_ops);
352 gpr_mu_unlock(&s->mu);
356 Traverse the linked list and delete op and free memory
358 static void remove_from_storage(struct stream_obj* s,
359 struct op_and_state* oas) {
360 struct op_and_state* curr;
361 if (s->storage.head == nullptr || oas == nullptr) {
364 if (s->storage.head == oas) {
365 s->storage.head = oas->next;
366 grpc_core::Delete(oas);
367 s->storage.num_pending_ops--;
368 CRONET_LOG(GPR_DEBUG, "Freed %p. Now %d in the queue", oas,
369 s->storage.num_pending_ops);
371 for (curr = s->storage.head; curr != nullptr; curr = curr->next) {
372 if (curr->next == oas) {
373 curr->next = oas->next;
374 s->storage.num_pending_ops--;
375 CRONET_LOG(GPR_DEBUG, "Freed %p. Now %d in the queue", oas,
376 s->storage.num_pending_ops);
377 grpc_core::Delete(oas);
379 } else if (GPR_UNLIKELY(curr->next == nullptr)) {
380 CRONET_LOG(GPR_ERROR, "Reached end of LL and did not find op to free");
387 Cycle through ops and try to take next action. Break when either
388 an action with callback is taken, or no action is possible.
389 This can get executed from the Cronet network thread via cronet callback
390 or on the application supplied thread via the perform_stream_op function.
392 static void execute_from_storage(stream_obj* s) {
394 for (struct op_and_state* curr = s->storage.head; curr != nullptr;) {
395 CRONET_LOG(GPR_DEBUG, "calling op at %p. done = %d", curr, curr->done);
396 GPR_ASSERT(!curr->done);
397 enum e_op_result result = execute_stream_op(curr);
398 CRONET_LOG(GPR_DEBUG, "execute_stream_op[%p] returns %s", curr,
399 op_result_string(result));
400 /* if this op is done, then remove it and free memory */
402 struct op_and_state* next = curr->next;
403 remove_from_storage(s, curr);
405 } else if (result == NO_ACTION_POSSIBLE) {
407 } else if (result == ACTION_TAKEN_WITH_CALLBACK) {
408 /* wait for the callback */
410 } /* continue processing the same op if ACTION_TAKEN_WITHOUT_CALLBACK */
412 gpr_mu_unlock(&s->mu);
415 static void convert_cronet_array_to_metadata(
416 const bidirectional_stream_header_array* header_array,
417 grpc_chttp2_incoming_metadata_buffer* mds) {
418 for (size_t i = 0; i < header_array->count; i++) {
419 CRONET_LOG(GPR_DEBUG, "header key=%s, value=%s",
420 header_array->headers[i].key, header_array->headers[i].value);
421 grpc_slice key = grpc_slice_intern(
422 grpc_slice_from_static_string(header_array->headers[i].key));
424 if (grpc_is_refcounted_slice_binary_header(key)) {
425 value = grpc_slice_from_static_string(header_array->headers[i].value);
426 value = grpc_slice_intern(grpc_chttp2_base64_decode_with_length(
427 value, grpc_chttp2_base64_infer_length_after_decode(value)));
429 value = grpc_slice_intern(
430 grpc_slice_from_static_string(header_array->headers[i].value));
432 GRPC_LOG_IF_ERROR("convert_cronet_array_to_metadata",
433 grpc_chttp2_incoming_metadata_buffer_add(
434 mds, grpc_mdelem_from_slices(key, value)));
441 static void on_failed(bidirectional_stream* stream, int net_error) {
442 gpr_log(GPR_ERROR, "on_failed(%p, %d)", stream, net_error);
443 grpc_core::ApplicationCallbackExecCtx callback_exec_ctx;
444 grpc_core::ExecCtx exec_ctx;
446 stream_obj* s = static_cast<stream_obj*>(stream->annotation);
448 bidirectional_stream_destroy(s->cbs);
449 s->state.state_callback_received[OP_FAILED] = true;
451 if (s->header_array.headers) {
452 gpr_free(s->header_array.headers);
453 s->header_array.headers = nullptr;
455 if (s->state.ws.write_buffer) {
456 gpr_free(s->state.ws.write_buffer);
457 s->state.ws.write_buffer = nullptr;
459 null_and_maybe_free_read_buffer(s);
460 gpr_mu_unlock(&s->mu);
461 execute_from_storage(s);
462 GRPC_CRONET_STREAM_UNREF(s, "cronet transport");
468 static void on_canceled(bidirectional_stream* stream) {
469 CRONET_LOG(GPR_DEBUG, "on_canceled(%p)", stream);
470 grpc_core::ApplicationCallbackExecCtx callback_exec_ctx;
471 grpc_core::ExecCtx exec_ctx;
473 stream_obj* s = static_cast<stream_obj*>(stream->annotation);
475 bidirectional_stream_destroy(s->cbs);
476 s->state.state_callback_received[OP_CANCELED] = true;
478 if (s->header_array.headers) {
479 gpr_free(s->header_array.headers);
480 s->header_array.headers = nullptr;
482 if (s->state.ws.write_buffer) {
483 gpr_free(s->state.ws.write_buffer);
484 s->state.ws.write_buffer = nullptr;
486 null_and_maybe_free_read_buffer(s);
487 gpr_mu_unlock(&s->mu);
488 execute_from_storage(s);
489 GRPC_CRONET_STREAM_UNREF(s, "cronet transport");
495 static void on_succeeded(bidirectional_stream* stream) {
496 CRONET_LOG(GPR_DEBUG, "on_succeeded(%p)", stream);
497 grpc_core::ApplicationCallbackExecCtx callback_exec_ctx;
498 grpc_core::ExecCtx exec_ctx;
500 stream_obj* s = static_cast<stream_obj*>(stream->annotation);
502 bidirectional_stream_destroy(s->cbs);
503 s->state.state_callback_received[OP_SUCCEEDED] = true;
505 null_and_maybe_free_read_buffer(s);
506 gpr_mu_unlock(&s->mu);
507 execute_from_storage(s);
508 GRPC_CRONET_STREAM_UNREF(s, "cronet transport");
514 static void on_stream_ready(bidirectional_stream* stream) {
515 CRONET_LOG(GPR_DEBUG, "W: on_stream_ready(%p)", stream);
516 grpc_core::ApplicationCallbackExecCtx callback_exec_ctx;
517 grpc_core::ExecCtx exec_ctx;
518 stream_obj* s = static_cast<stream_obj*>(stream->annotation);
519 grpc_cronet_transport* t = s->curr_ct;
521 s->state.state_op_done[OP_SEND_INITIAL_METADATA] = true;
522 s->state.state_callback_received[OP_SEND_INITIAL_METADATA] = true;
523 /* Free the memory allocated for headers */
524 if (s->header_array.headers) {
525 gpr_free(s->header_array.headers);
526 s->header_array.headers = nullptr;
528 /* Send the initial metadata on wire if there is no SEND_MESSAGE or
529 * SEND_TRAILING_METADATA ops pending */
530 if (t->use_packet_coalescing) {
531 if (s->state.flush_cronet_when_ready) {
532 CRONET_LOG(GPR_DEBUG, "cronet_bidirectional_stream_flush (%p)", s->cbs);
533 bidirectional_stream_flush(stream);
536 gpr_mu_unlock(&s->mu);
537 execute_from_storage(s);
543 static void on_response_headers_received(
544 bidirectional_stream* stream,
545 const bidirectional_stream_header_array* headers,
546 const char* negotiated_protocol) {
547 grpc_core::ApplicationCallbackExecCtx callback_exec_ctx;
548 grpc_core::ExecCtx exec_ctx;
549 CRONET_LOG(GPR_DEBUG, "R: on_response_headers_received(%p, %p, %s)", stream,
550 headers, negotiated_protocol);
551 stream_obj* s = static_cast<stream_obj*>(stream->annotation);
553 /* Identify if this is a header or a trailer (in a trailer-only response case)
555 for (size_t i = 0; i < headers->count; i++) {
556 if (0 == strcmp("grpc-status", headers->headers[i].key)) {
557 on_response_trailers_received(stream, headers);
563 convert_cronet_array_to_metadata(headers, &s->state.rs.initial_metadata);
564 s->state.state_callback_received[OP_RECV_INITIAL_METADATA] = true;
565 if (!(s->state.state_op_done[OP_CANCEL_ERROR] ||
566 s->state.state_callback_received[OP_FAILED])) {
567 /* Do an extra read to trigger on_succeeded() callback in case connection
569 GPR_ASSERT(s->state.rs.length_field_received == false);
570 s->state.rs.read_buffer = s->state.rs.grpc_header_bytes;
571 s->state.rs.compressed = false;
572 s->state.rs.received_bytes = 0;
573 s->state.rs.remaining_bytes = GRPC_HEADER_SIZE_IN_BYTES;
574 CRONET_LOG(GPR_DEBUG, "bidirectional_stream_read(%p)", s->cbs);
575 bidirectional_stream_read(s->cbs, s->state.rs.read_buffer,
576 s->state.rs.remaining_bytes);
577 s->state.pending_read_from_cronet = true;
579 gpr_mu_unlock(&s->mu);
580 execute_from_storage(s);
586 static void on_write_completed(bidirectional_stream* stream, const char* data) {
587 grpc_core::ApplicationCallbackExecCtx callback_exec_ctx;
588 grpc_core::ExecCtx exec_ctx;
589 stream_obj* s = static_cast<stream_obj*>(stream->annotation);
590 CRONET_LOG(GPR_DEBUG, "W: on_write_completed(%p, %s)", stream, data);
592 if (s->state.ws.write_buffer) {
593 gpr_free(s->state.ws.write_buffer);
594 s->state.ws.write_buffer = nullptr;
596 s->state.state_callback_received[OP_SEND_MESSAGE] = true;
597 gpr_mu_unlock(&s->mu);
598 execute_from_storage(s);
604 static void on_read_completed(bidirectional_stream* stream, char* data,
606 grpc_core::ApplicationCallbackExecCtx callback_exec_ctx;
607 grpc_core::ExecCtx exec_ctx;
608 stream_obj* s = static_cast<stream_obj*>(stream->annotation);
609 CRONET_LOG(GPR_DEBUG, "R: on_read_completed(%p, %p, %d)", stream, data,
612 s->state.pending_read_from_cronet = false;
613 s->state.state_callback_received[OP_RECV_MESSAGE] = true;
614 if (count > 0 && s->state.flush_read) {
615 CRONET_LOG(GPR_DEBUG, "bidirectional_stream_read(%p)", s->cbs);
616 bidirectional_stream_read(s->cbs, s->state.rs.read_buffer,
617 GRPC_FLUSH_READ_SIZE);
618 s->state.pending_read_from_cronet = true;
619 gpr_mu_unlock(&s->mu);
620 } else if (count > 0) {
621 s->state.rs.received_bytes += count;
622 s->state.rs.remaining_bytes -= count;
623 if (s->state.rs.remaining_bytes > 0) {
624 CRONET_LOG(GPR_DEBUG, "bidirectional_stream_read(%p)", s->cbs);
625 s->state.state_op_done[OP_READ_REQ_MADE] = true;
626 bidirectional_stream_read(
627 s->cbs, s->state.rs.read_buffer + s->state.rs.received_bytes,
628 s->state.rs.remaining_bytes);
629 s->state.pending_read_from_cronet = true;
630 gpr_mu_unlock(&s->mu);
632 gpr_mu_unlock(&s->mu);
633 execute_from_storage(s);
636 null_and_maybe_free_read_buffer(s);
637 s->state.rs.read_stream_closed = true;
638 gpr_mu_unlock(&s->mu);
639 execute_from_storage(s);
646 static void on_response_trailers_received(
647 bidirectional_stream* stream,
648 const bidirectional_stream_header_array* trailers) {
649 grpc_core::ApplicationCallbackExecCtx callback_exec_ctx;
650 grpc_core::ExecCtx exec_ctx;
651 CRONET_LOG(GPR_DEBUG, "R: on_response_trailers_received(%p,%p)", stream,
653 stream_obj* s = static_cast<stream_obj*>(stream->annotation);
654 grpc_cronet_transport* t = s->curr_ct;
656 s->state.rs.trailing_metadata_valid = false;
657 convert_cronet_array_to_metadata(trailers, &s->state.rs.trailing_metadata);
658 if (trailers->count > 0) {
659 s->state.rs.trailing_metadata_valid = true;
661 for (size_t i = 0; i < trailers->count; i++) {
662 if (0 == strcmp(trailers->headers[i].key, "grpc-status") &&
663 0 != strcmp(trailers->headers[i].value, "0")) {
664 s->state.fail_state = true;
668 s->state.state_callback_received[OP_RECV_TRAILING_METADATA] = true;
669 /* Send a EOS when server terminates the stream (testServerFinishesRequest) to
670 * trigger on_succeeded */
671 if (!s->state.state_op_done[OP_SEND_TRAILING_METADATA] &&
672 !(s->state.state_op_done[OP_CANCEL_ERROR] ||
673 s->state.state_callback_received[OP_FAILED])) {
674 CRONET_LOG(GPR_DEBUG, "bidirectional_stream_write (%p, 0)", s->cbs);
675 s->state.state_callback_received[OP_SEND_MESSAGE] = false;
676 bidirectional_stream_write(s->cbs, "", 0, true);
677 if (t->use_packet_coalescing) {
678 CRONET_LOG(GPR_DEBUG, "bidirectional_stream_flush (%p)", s->cbs);
679 bidirectional_stream_flush(s->cbs);
681 s->state.state_op_done[OP_SEND_TRAILING_METADATA] = true;
683 gpr_mu_unlock(&s->mu);
685 gpr_mu_unlock(&s->mu);
686 execute_from_storage(s);
691 Utility function that takes the data from s->write_slice_buffer and assembles
692 into a contiguous byte stream with 5 byte gRPC header prepended.
694 static void create_grpc_frame(grpc_slice_buffer* write_slice_buffer,
695 char** pp_write_buffer,
696 size_t* p_write_buffer_size, uint32_t flags) {
697 grpc_slice slice = grpc_slice_buffer_take_first(write_slice_buffer);
698 size_t length = GRPC_SLICE_LENGTH(slice);
699 *p_write_buffer_size = length + GRPC_HEADER_SIZE_IN_BYTES;
700 /* This is freed in the on_write_completed callback */
702 static_cast<char*>(gpr_malloc(length + GRPC_HEADER_SIZE_IN_BYTES));
703 *pp_write_buffer = write_buffer;
704 uint8_t* p = reinterpret_cast<uint8_t*>(write_buffer);
705 /* Append 5 byte header */
706 /* Compressed flag */
707 *p++ = static_cast<uint8_t>((flags & GRPC_WRITE_INTERNAL_COMPRESS) ? 1 : 0);
709 *p++ = static_cast<uint8_t>(length >> 24);
710 *p++ = static_cast<uint8_t>(length >> 16);
711 *p++ = static_cast<uint8_t>(length >> 8);
712 *p++ = static_cast<uint8_t>(length);
713 /* append actual data */
714 memcpy(p, GRPC_SLICE_START_PTR(slice), length);
715 grpc_slice_unref_internal(slice);
719 Convert metadata in a format that Cronet can consume
721 static void convert_metadata_to_cronet_headers(
722 grpc_metadata_batch* metadata, const char* host, char** pp_url,
723 bidirectional_stream_header** pp_headers, size_t* p_num_headers,
724 const char** method) {
725 grpc_linked_mdelem* curr = metadata->list.head;
726 /* Walk the linked list and get number of header fields */
727 size_t num_headers_available = 0;
728 while (curr != nullptr) {
730 num_headers_available++;
732 grpc_millis deadline = metadata->deadline;
733 if (deadline != GRPC_MILLIS_INF_FUTURE) {
734 num_headers_available++;
736 /* Allocate enough memory. It is freed in the on_stream_ready callback
738 bidirectional_stream_header* headers =
739 static_cast<bidirectional_stream_header*>(gpr_malloc(
740 sizeof(bidirectional_stream_header) * num_headers_available));
741 *pp_headers = headers;
743 /* Walk the linked list again, this time copying the header fields.
744 s->num_headers can be less than num_headers_available, as some headers
745 are not used for cronet.
746 TODO (makdharma): Eliminate need to traverse the LL second time for perf.
748 curr = metadata->list.head;
749 size_t num_headers = 0;
750 while (num_headers < num_headers_available) {
751 grpc_mdelem mdelem = curr->md;
753 char* key = grpc_slice_to_c_string(GRPC_MDKEY(mdelem));
755 if (grpc_is_binary_header_internal(GRPC_MDKEY(mdelem))) {
756 grpc_slice wire_value = grpc_chttp2_base64_encode(GRPC_MDVALUE(mdelem));
757 value = grpc_slice_to_c_string(wire_value);
758 grpc_slice_unref_internal(wire_value);
760 value = grpc_slice_to_c_string(GRPC_MDVALUE(mdelem));
762 if (grpc_slice_eq_static_interned(GRPC_MDKEY(mdelem), GRPC_MDSTR_SCHEME) ||
763 grpc_slice_eq_static_interned(GRPC_MDKEY(mdelem),
764 GRPC_MDSTR_AUTHORITY)) {
765 /* Cronet populates these fields on its own */
770 if (grpc_slice_eq_static_interned(GRPC_MDKEY(mdelem), GRPC_MDSTR_METHOD)) {
771 if (grpc_slice_eq_static_interned(GRPC_MDVALUE(mdelem), GRPC_MDSTR_PUT)) {
774 /* POST method in default*/
781 if (grpc_slice_eq_static_interned(GRPC_MDKEY(mdelem), GRPC_MDSTR_PATH)) {
782 /* Create URL by appending :path value to the hostname */
783 gpr_asprintf(pp_url, "https://%s%s", host, value);
788 CRONET_LOG(GPR_DEBUG, "header %s = %s", key, value);
789 headers[num_headers].key = key;
790 headers[num_headers].value = value;
792 if (curr == nullptr) {
796 if (deadline != GRPC_MILLIS_INF_FUTURE) {
797 char* key = grpc_slice_to_c_string(GRPC_MDSTR_GRPC_TIMEOUT);
799 static_cast<char*>(gpr_malloc(GRPC_HTTP2_TIMEOUT_ENCODE_MIN_BUFSIZE));
800 grpc_http2_encode_timeout(deadline - grpc_core::ExecCtx::Get()->Now(),
802 headers[num_headers].key = key;
803 headers[num_headers].value = value;
808 *p_num_headers = num_headers;
811 static void parse_grpc_header(const uint8_t* data, int* length,
813 const uint8_t c = *data;
814 const uint8_t* p = data + 1;
815 *compressed = ((c & 0x01) == 0x01);
817 *length |= (*p++) << 24;
818 *length |= (*p++) << 16;
819 *length |= (*p++) << 8;
823 static bool header_has_authority(grpc_linked_mdelem* head) {
824 while (head != nullptr) {
825 if (grpc_slice_eq_static_interned(GRPC_MDKEY(head->md),
826 GRPC_MDSTR_AUTHORITY)) {
835 Op Execution: Decide if one of the actions contained in the stream op can be
836 executed. This is the heart of the state machine.
838 static bool op_can_be_run(grpc_transport_stream_op_batch* curr_op,
839 struct stream_obj* s, struct op_state* op_state,
840 enum e_op_id op_id) {
841 struct op_state* stream_state = &s->state;
842 grpc_cronet_transport* t = s->curr_ct;
844 /* When call is canceled, every op can be run, except under following
847 bool is_canceled_or_failed = stream_state->state_op_done[OP_CANCEL_ERROR] ||
848 stream_state->state_callback_received[OP_FAILED];
849 if (is_canceled_or_failed) {
850 if (op_id == OP_SEND_INITIAL_METADATA) {
851 CRONET_LOG(GPR_DEBUG, "Because");
854 if (op_id == OP_SEND_MESSAGE) {
855 CRONET_LOG(GPR_DEBUG, "Because");
858 if (op_id == OP_SEND_TRAILING_METADATA) {
859 CRONET_LOG(GPR_DEBUG, "Because");
862 if (op_id == OP_CANCEL_ERROR) {
863 CRONET_LOG(GPR_DEBUG, "Because");
866 /* already executed */
867 if (op_id == OP_RECV_INITIAL_METADATA &&
868 stream_state->state_op_done[OP_RECV_INITIAL_METADATA]) {
869 CRONET_LOG(GPR_DEBUG, "Because");
872 if (op_id == OP_RECV_MESSAGE && op_state->state_op_done[OP_RECV_MESSAGE]) {
873 CRONET_LOG(GPR_DEBUG, "Because");
876 if (op_id == OP_RECV_TRAILING_METADATA &&
877 stream_state->state_op_done[OP_RECV_TRAILING_METADATA]) {
878 CRONET_LOG(GPR_DEBUG, "Because");
881 /* ON_COMPLETE can be processed if one of the following conditions is met:
882 * 1. the stream failed
883 * 2. the stream is cancelled, and the callback is received
884 * 3. the stream succeeded before cancel is effective
885 * 4. the stream is cancelled, and the stream is never started */
886 if (op_id == OP_ON_COMPLETE &&
887 !(stream_state->state_callback_received[OP_FAILED] ||
888 stream_state->state_callback_received[OP_CANCELED] ||
889 stream_state->state_callback_received[OP_SUCCEEDED] ||
890 !stream_state->state_op_done[OP_SEND_INITIAL_METADATA])) {
891 CRONET_LOG(GPR_DEBUG, "Because");
894 } else if (op_id == OP_SEND_INITIAL_METADATA) {
895 /* already executed */
896 if (stream_state->state_op_done[OP_SEND_INITIAL_METADATA]) result = false;
897 } else if (op_id == OP_RECV_INITIAL_METADATA) {
898 /* already executed */
899 if (stream_state->state_op_done[OP_RECV_INITIAL_METADATA]) result = false;
900 /* we haven't sent headers yet. */
901 else if (!stream_state->state_callback_received[OP_SEND_INITIAL_METADATA])
903 /* we haven't received headers yet. */
904 else if (!stream_state->state_callback_received[OP_RECV_INITIAL_METADATA] &&
905 !stream_state->state_op_done[OP_RECV_TRAILING_METADATA])
907 } else if (op_id == OP_SEND_MESSAGE) {
908 /* already executed (note we're checking op specific state, not stream
910 if (op_state->state_op_done[OP_SEND_MESSAGE]) result = false;
911 /* we haven't sent headers yet. */
912 else if (!stream_state->state_callback_received[OP_SEND_INITIAL_METADATA])
914 } else if (op_id == OP_RECV_MESSAGE) {
915 /* already executed */
916 if (op_state->state_op_done[OP_RECV_MESSAGE]) result = false;
917 /* we haven't received headers yet. */
918 else if (!stream_state->state_callback_received[OP_RECV_INITIAL_METADATA] &&
919 !stream_state->state_op_done[OP_RECV_TRAILING_METADATA])
921 } else if (op_id == OP_RECV_TRAILING_METADATA) {
922 /* already executed */
923 if (stream_state->state_op_done[OP_RECV_TRAILING_METADATA]) result = false;
924 /* we have asked for but haven't received message yet. */
925 else if (stream_state->state_op_done[OP_READ_REQ_MADE] &&
926 !stream_state->state_op_done[OP_RECV_MESSAGE])
928 /* we haven't received trailers yet. */
929 else if (!stream_state->state_callback_received[OP_RECV_TRAILING_METADATA])
931 /* we haven't received on_succeeded yet. */
932 else if (!stream_state->state_callback_received[OP_SUCCEEDED])
934 } else if (op_id == OP_SEND_TRAILING_METADATA) {
935 /* already executed */
936 if (stream_state->state_op_done[OP_SEND_TRAILING_METADATA]) result = false;
937 /* we haven't sent initial metadata yet */
938 else if (!stream_state->state_callback_received[OP_SEND_INITIAL_METADATA])
940 /* we haven't sent message yet */
941 else if (stream_state->pending_send_message &&
942 !stream_state->state_op_done[OP_SEND_MESSAGE])
944 /* we haven't got on_write_completed for the send yet */
945 else if (stream_state->state_op_done[OP_SEND_MESSAGE] &&
946 !stream_state->state_callback_received[OP_SEND_MESSAGE] &&
947 !(t->use_packet_coalescing &&
948 stream_state->pending_write_for_trailer))
950 } else if (op_id == OP_CANCEL_ERROR) {
951 /* already executed */
952 if (stream_state->state_op_done[OP_CANCEL_ERROR]) result = false;
953 } else if (op_id == OP_ON_COMPLETE) {
954 /* already executed (note we're checking op specific state, not stream
956 if (op_state->state_op_done[OP_ON_COMPLETE]) {
957 CRONET_LOG(GPR_DEBUG, "Because");
960 /* Check if every op that was asked for is done. */
961 /* TODO(muxi): We should not consider the recv ops here, since they
962 * have their own callbacks. We should invoke a batch's on_complete
963 * as soon as all of the batch's send ops are complete, even if
964 * there are still recv ops pending. */
965 else if (curr_op->send_initial_metadata &&
966 !stream_state->state_callback_received[OP_SEND_INITIAL_METADATA]) {
967 CRONET_LOG(GPR_DEBUG, "Because");
969 } else if (curr_op->send_message &&
970 !op_state->state_op_done[OP_SEND_MESSAGE]) {
971 CRONET_LOG(GPR_DEBUG, "Because");
973 } else if (curr_op->send_message &&
974 !stream_state->state_callback_received[OP_SEND_MESSAGE]) {
975 CRONET_LOG(GPR_DEBUG, "Because");
977 } else if (curr_op->send_trailing_metadata &&
978 !stream_state->state_op_done[OP_SEND_TRAILING_METADATA]) {
979 CRONET_LOG(GPR_DEBUG, "Because");
981 } else if (curr_op->recv_initial_metadata &&
982 !stream_state->state_op_done[OP_RECV_INITIAL_METADATA]) {
983 CRONET_LOG(GPR_DEBUG, "Because");
985 } else if (curr_op->recv_message &&
986 !op_state->state_op_done[OP_RECV_MESSAGE]) {
987 CRONET_LOG(GPR_DEBUG, "Because");
989 } else if (curr_op->cancel_stream &&
990 !stream_state->state_callback_received[OP_CANCELED]) {
991 CRONET_LOG(GPR_DEBUG, "Because");
993 } else if (curr_op->recv_trailing_metadata) {
994 /* We aren't done with trailing metadata yet */
995 if (!stream_state->state_op_done[OP_RECV_TRAILING_METADATA]) {
996 CRONET_LOG(GPR_DEBUG, "Because");
999 /* We've asked for actual message in an earlier op, and it hasn't been
1001 else if (stream_state->state_op_done[OP_READ_REQ_MADE]) {
1002 /* If this op is not the one asking for read, (which means some earlier
1003 op has asked), and the read hasn't been delivered. */
1004 if (!curr_op->recv_message &&
1005 !stream_state->state_callback_received[OP_SUCCEEDED]) {
1006 CRONET_LOG(GPR_DEBUG, "Because");
1011 /* We should see at least one on_write_completed for the trailers that we
1013 else if (curr_op->send_trailing_metadata &&
1014 !stream_state->state_callback_received[OP_SEND_MESSAGE])
1017 CRONET_LOG(GPR_DEBUG, "op_can_be_run %s : %s", op_id_string(op_id),
1018 result ? "YES" : "NO");
1023 TODO (makdharma): Break down this function in smaller chunks for readability.
1025 static enum e_op_result execute_stream_op(struct op_and_state* oas) {
1026 grpc_transport_stream_op_batch* stream_op = &oas->op;
1027 struct stream_obj* s = oas->s;
1028 grpc_cronet_transport* t = s->curr_ct;
1029 struct op_state* stream_state = &s->state;
1030 enum e_op_result result = NO_ACTION_POSSIBLE;
1031 if (stream_op->send_initial_metadata &&
1032 op_can_be_run(stream_op, s, &oas->state, OP_SEND_INITIAL_METADATA)) {
1033 CRONET_LOG(GPR_DEBUG, "running: %p OP_SEND_INITIAL_METADATA", oas);
1034 /* Start new cronet stream. It is destroyed in on_succeeded, on_canceled,
1036 GPR_ASSERT(s->cbs == nullptr);
1037 GPR_ASSERT(!stream_state->state_op_done[OP_SEND_INITIAL_METADATA]);
1039 bidirectional_stream_create(t->engine, s->curr_gs, &cronet_callbacks);
1040 CRONET_LOG(GPR_DEBUG, "%p = bidirectional_stream_create()", s->cbs);
1041 if (t->use_packet_coalescing) {
1042 bidirectional_stream_disable_auto_flush(s->cbs, true);
1043 bidirectional_stream_delay_request_headers_until_flush(s->cbs, true);
1045 char* url = nullptr;
1046 const char* method = "POST";
1047 s->header_array.headers = nullptr;
1048 convert_metadata_to_cronet_headers(
1049 stream_op->payload->send_initial_metadata.send_initial_metadata,
1050 t->host, &url, &s->header_array.headers, &s->header_array.count,
1052 s->header_array.capacity = s->header_array.count;
1053 CRONET_LOG(GPR_DEBUG, "bidirectional_stream_start(%p, %s)", s->cbs, url);
1054 bidirectional_stream_start(s->cbs, url, 0, method, &s->header_array, false);
1058 unsigned int header_index;
1059 for (header_index = 0; header_index < s->header_array.count;
1061 gpr_free((void*)s->header_array.headers[header_index].key);
1062 gpr_free((void*)s->header_array.headers[header_index].value);
1064 stream_state->state_op_done[OP_SEND_INITIAL_METADATA] = true;
1065 if (t->use_packet_coalescing) {
1066 if (!stream_op->send_message && !stream_op->send_trailing_metadata) {
1067 s->state.flush_cronet_when_ready = true;
1070 result = ACTION_TAKEN_WITH_CALLBACK;
1071 } else if (stream_op->send_message &&
1072 op_can_be_run(stream_op, s, &oas->state, OP_SEND_MESSAGE)) {
1073 CRONET_LOG(GPR_DEBUG, "running: %p OP_SEND_MESSAGE", oas);
1074 stream_state->pending_send_message = false;
1075 if (stream_state->state_callback_received[OP_FAILED]) {
1076 result = NO_ACTION_POSSIBLE;
1077 CRONET_LOG(GPR_DEBUG, "Stream is either cancelled or failed.");
1079 grpc_slice_buffer write_slice_buffer;
1081 grpc_slice_buffer_init(&write_slice_buffer);
1082 if (1 != stream_op->payload->send_message.send_message->Next(
1083 stream_op->payload->send_message.send_message->length(),
1085 /* Should never reach here */
1088 if (GRPC_ERROR_NONE !=
1089 stream_op->payload->send_message.send_message->Pull(&slice)) {
1090 /* Should never reach here */
1093 grpc_slice_buffer_add(&write_slice_buffer, slice);
1094 if (GPR_UNLIKELY(write_slice_buffer.count != 1)) {
1095 /* Empty request not handled yet */
1096 gpr_log(GPR_ERROR, "Empty request is not supported");
1097 GPR_ASSERT(write_slice_buffer.count == 1);
1099 if (write_slice_buffer.count > 0) {
1100 size_t write_buffer_size;
1102 &write_slice_buffer, &stream_state->ws.write_buffer,
1104 stream_op->payload->send_message.send_message->flags());
1105 CRONET_LOG(GPR_DEBUG, "bidirectional_stream_write (%p, %p)", s->cbs,
1106 stream_state->ws.write_buffer);
1107 stream_state->state_callback_received[OP_SEND_MESSAGE] = false;
1108 bidirectional_stream_write(s->cbs, stream_state->ws.write_buffer,
1109 static_cast<int>(write_buffer_size), false);
1110 grpc_slice_buffer_destroy_internal(&write_slice_buffer);
1111 if (t->use_packet_coalescing) {
1112 if (!stream_op->send_trailing_metadata) {
1113 CRONET_LOG(GPR_DEBUG, "bidirectional_stream_flush (%p)", s->cbs);
1114 bidirectional_stream_flush(s->cbs);
1115 result = ACTION_TAKEN_WITH_CALLBACK;
1117 stream_state->pending_write_for_trailer = true;
1118 result = ACTION_TAKEN_NO_CALLBACK;
1121 result = ACTION_TAKEN_WITH_CALLBACK;
1124 result = NO_ACTION_POSSIBLE;
1127 stream_state->state_op_done[OP_SEND_MESSAGE] = true;
1128 oas->state.state_op_done[OP_SEND_MESSAGE] = true;
1129 stream_op->payload->send_message.send_message.reset();
1130 } else if (stream_op->send_trailing_metadata &&
1131 op_can_be_run(stream_op, s, &oas->state,
1132 OP_SEND_TRAILING_METADATA)) {
1133 CRONET_LOG(GPR_DEBUG, "running: %p OP_SEND_TRAILING_METADATA", oas);
1134 if (stream_state->state_callback_received[OP_FAILED]) {
1135 result = NO_ACTION_POSSIBLE;
1136 CRONET_LOG(GPR_DEBUG, "Stream is either cancelled or failed.");
1138 CRONET_LOG(GPR_DEBUG, "bidirectional_stream_write (%p, 0)", s->cbs);
1139 stream_state->state_callback_received[OP_SEND_MESSAGE] = false;
1140 bidirectional_stream_write(s->cbs, "", 0, true);
1141 if (t->use_packet_coalescing) {
1142 CRONET_LOG(GPR_DEBUG, "bidirectional_stream_flush (%p)", s->cbs);
1143 bidirectional_stream_flush(s->cbs);
1145 result = ACTION_TAKEN_WITH_CALLBACK;
1147 stream_state->state_op_done[OP_SEND_TRAILING_METADATA] = true;
1148 } else if (stream_op->recv_initial_metadata &&
1149 op_can_be_run(stream_op, s, &oas->state,
1150 OP_RECV_INITIAL_METADATA)) {
1151 CRONET_LOG(GPR_DEBUG, "running: %p OP_RECV_INITIAL_METADATA", oas);
1152 if (stream_state->state_op_done[OP_CANCEL_ERROR]) {
1154 stream_op->payload->recv_initial_metadata.recv_initial_metadata_ready,
1156 } else if (stream_state->state_callback_received[OP_FAILED]) {
1158 stream_op->payload->recv_initial_metadata.recv_initial_metadata_ready,
1160 } else if (stream_state->state_op_done[OP_RECV_TRAILING_METADATA]) {
1162 stream_op->payload->recv_initial_metadata.recv_initial_metadata_ready,
1165 grpc_chttp2_incoming_metadata_buffer_publish(
1166 &oas->s->state.rs.initial_metadata,
1167 stream_op->payload->recv_initial_metadata.recv_initial_metadata);
1169 stream_op->payload->recv_initial_metadata.recv_initial_metadata_ready,
1172 stream_state->state_op_done[OP_RECV_INITIAL_METADATA] = true;
1173 result = ACTION_TAKEN_NO_CALLBACK;
1174 } else if (stream_op->recv_message &&
1175 op_can_be_run(stream_op, s, &oas->state, OP_RECV_MESSAGE)) {
1176 CRONET_LOG(GPR_DEBUG, "running: %p OP_RECV_MESSAGE", oas);
1177 if (stream_state->state_op_done[OP_CANCEL_ERROR]) {
1178 CRONET_LOG(GPR_DEBUG, "Stream is cancelled.");
1179 GRPC_CLOSURE_SCHED(stream_op->payload->recv_message.recv_message_ready,
1181 stream_state->state_op_done[OP_RECV_MESSAGE] = true;
1182 oas->state.state_op_done[OP_RECV_MESSAGE] = true;
1183 result = ACTION_TAKEN_NO_CALLBACK;
1184 } else if (stream_state->state_callback_received[OP_FAILED]) {
1185 CRONET_LOG(GPR_DEBUG, "Stream failed.");
1186 GRPC_CLOSURE_SCHED(stream_op->payload->recv_message.recv_message_ready,
1188 stream_state->state_op_done[OP_RECV_MESSAGE] = true;
1189 oas->state.state_op_done[OP_RECV_MESSAGE] = true;
1190 result = ACTION_TAKEN_NO_CALLBACK;
1191 } else if (stream_state->rs.read_stream_closed == true) {
1192 /* No more data will be received */
1193 CRONET_LOG(GPR_DEBUG, "read stream closed");
1194 GRPC_CLOSURE_SCHED(stream_op->payload->recv_message.recv_message_ready,
1196 stream_state->state_op_done[OP_RECV_MESSAGE] = true;
1197 oas->state.state_op_done[OP_RECV_MESSAGE] = true;
1198 result = ACTION_TAKEN_NO_CALLBACK;
1199 } else if (stream_state->flush_read) {
1200 CRONET_LOG(GPR_DEBUG, "flush read");
1201 GRPC_CLOSURE_SCHED(stream_op->payload->recv_message.recv_message_ready,
1203 stream_state->state_op_done[OP_RECV_MESSAGE] = true;
1204 oas->state.state_op_done[OP_RECV_MESSAGE] = true;
1205 result = ACTION_TAKEN_NO_CALLBACK;
1206 } else if (stream_state->rs.length_field_received == false) {
1207 if (stream_state->rs.received_bytes == GRPC_HEADER_SIZE_IN_BYTES &&
1208 stream_state->rs.remaining_bytes == 0) {
1209 /* Start a read operation for data */
1210 stream_state->rs.length_field_received = true;
1212 reinterpret_cast<const uint8_t*>(stream_state->rs.read_buffer),
1213 &stream_state->rs.length_field, &stream_state->rs.compressed);
1214 CRONET_LOG(GPR_DEBUG, "length field = %d",
1215 stream_state->rs.length_field);
1216 if (stream_state->rs.length_field > 0) {
1217 stream_state->rs.read_buffer = static_cast<char*>(
1218 gpr_malloc(static_cast<size_t>(stream_state->rs.length_field)));
1219 GPR_ASSERT(stream_state->rs.read_buffer);
1220 stream_state->rs.remaining_bytes = stream_state->rs.length_field;
1221 stream_state->rs.received_bytes = 0;
1222 CRONET_LOG(GPR_DEBUG, "bidirectional_stream_read(%p)", s->cbs);
1223 stream_state->state_op_done[OP_READ_REQ_MADE] =
1224 true; /* Indicates that at least one read request has been made */
1225 bidirectional_stream_read(s->cbs, stream_state->rs.read_buffer,
1226 stream_state->rs.remaining_bytes);
1227 stream_state->pending_read_from_cronet = true;
1228 result = ACTION_TAKEN_WITH_CALLBACK;
1230 stream_state->rs.remaining_bytes = 0;
1231 CRONET_LOG(GPR_DEBUG, "read operation complete. Empty response.");
1232 /* Clean up read_slice_buffer in case there is unread data. */
1233 grpc_slice_buffer_destroy_internal(
1234 &stream_state->rs.read_slice_buffer);
1235 grpc_slice_buffer_init(&stream_state->rs.read_slice_buffer);
1237 if (stream_state->rs.compressed) {
1238 flags |= GRPC_WRITE_INTERNAL_COMPRESS;
1240 stream_state->rs.sbs.Init(&stream_state->rs.read_slice_buffer, flags);
1241 stream_op->payload->recv_message.recv_message->reset(
1242 stream_state->rs.sbs.get());
1244 stream_op->payload->recv_message.recv_message_ready,
1246 stream_state->state_op_done[OP_RECV_MESSAGE] = true;
1247 oas->state.state_op_done[OP_RECV_MESSAGE] = true;
1249 /* Extra read to trigger on_succeed */
1250 stream_state->rs.read_buffer = stream_state->rs.grpc_header_bytes;
1251 stream_state->rs.remaining_bytes = GRPC_HEADER_SIZE_IN_BYTES;
1252 stream_state->rs.received_bytes = 0;
1253 stream_state->rs.compressed = false;
1254 stream_state->rs.length_field_received = false;
1255 CRONET_LOG(GPR_DEBUG, "bidirectional_stream_read(%p)", s->cbs);
1256 stream_state->state_op_done[OP_READ_REQ_MADE] =
1257 true; /* Indicates that at least one read request has been made */
1258 bidirectional_stream_read(s->cbs, stream_state->rs.read_buffer,
1259 stream_state->rs.remaining_bytes);
1260 stream_state->pending_read_from_cronet = true;
1261 result = ACTION_TAKEN_NO_CALLBACK;
1263 } else if (stream_state->rs.remaining_bytes == 0) {
1264 /* Start a read operation for first 5 bytes (GRPC header) */
1265 stream_state->rs.read_buffer = stream_state->rs.grpc_header_bytes;
1266 stream_state->rs.remaining_bytes = GRPC_HEADER_SIZE_IN_BYTES;
1267 stream_state->rs.received_bytes = 0;
1268 stream_state->rs.compressed = false;
1269 CRONET_LOG(GPR_DEBUG, "bidirectional_stream_read(%p)", s->cbs);
1270 stream_state->state_op_done[OP_READ_REQ_MADE] =
1271 true; /* Indicates that at least one read request has been made */
1272 bidirectional_stream_read(s->cbs, stream_state->rs.read_buffer,
1273 stream_state->rs.remaining_bytes);
1274 stream_state->pending_read_from_cronet = true;
1275 result = ACTION_TAKEN_WITH_CALLBACK;
1277 result = NO_ACTION_POSSIBLE;
1279 } else if (stream_state->rs.remaining_bytes == 0) {
1280 CRONET_LOG(GPR_DEBUG, "read operation complete");
1281 grpc_slice read_data_slice =
1282 GRPC_SLICE_MALLOC((uint32_t)stream_state->rs.length_field);
1283 uint8_t* dst_p = GRPC_SLICE_START_PTR(read_data_slice);
1284 memcpy(dst_p, stream_state->rs.read_buffer,
1285 static_cast<size_t>(stream_state->rs.length_field));
1286 null_and_maybe_free_read_buffer(s);
1287 /* Clean up read_slice_buffer in case there is unread data. */
1288 grpc_slice_buffer_destroy_internal(&stream_state->rs.read_slice_buffer);
1289 grpc_slice_buffer_init(&stream_state->rs.read_slice_buffer);
1290 grpc_slice_buffer_add(&stream_state->rs.read_slice_buffer,
1293 if (stream_state->rs.compressed) {
1294 flags = GRPC_WRITE_INTERNAL_COMPRESS;
1296 stream_state->rs.sbs.Init(&stream_state->rs.read_slice_buffer, flags);
1297 stream_op->payload->recv_message.recv_message->reset(
1298 stream_state->rs.sbs.get());
1299 GRPC_CLOSURE_SCHED(stream_op->payload->recv_message.recv_message_ready,
1301 stream_state->state_op_done[OP_RECV_MESSAGE] = true;
1302 oas->state.state_op_done[OP_RECV_MESSAGE] = true;
1303 /* Do an extra read to trigger on_succeeded() callback in case connection
1305 stream_state->rs.read_buffer = stream_state->rs.grpc_header_bytes;
1306 stream_state->rs.compressed = false;
1307 stream_state->rs.received_bytes = 0;
1308 stream_state->rs.remaining_bytes = GRPC_HEADER_SIZE_IN_BYTES;
1309 stream_state->rs.length_field_received = false;
1310 CRONET_LOG(GPR_DEBUG, "bidirectional_stream_read(%p)", s->cbs);
1311 bidirectional_stream_read(s->cbs, stream_state->rs.read_buffer,
1312 stream_state->rs.remaining_bytes);
1313 stream_state->pending_read_from_cronet = true;
1314 result = ACTION_TAKEN_NO_CALLBACK;
1316 } else if (stream_op->recv_trailing_metadata &&
1317 op_can_be_run(stream_op, s, &oas->state,
1318 OP_RECV_TRAILING_METADATA)) {
1319 CRONET_LOG(GPR_DEBUG, "running: %p OP_RECV_TRAILING_METADATA", oas);
1320 grpc_error* error = GRPC_ERROR_NONE;
1321 if (stream_state->state_op_done[OP_CANCEL_ERROR]) {
1322 error = GRPC_ERROR_REF(stream_state->cancel_error);
1323 } else if (stream_state->state_callback_received[OP_FAILED]) {
1324 error = make_error_with_desc(GRPC_STATUS_UNAVAILABLE, "Unavailable.");
1325 } else if (oas->s->state.rs.trailing_metadata_valid) {
1326 grpc_chttp2_incoming_metadata_buffer_publish(
1327 &oas->s->state.rs.trailing_metadata,
1328 stream_op->payload->recv_trailing_metadata.recv_trailing_metadata);
1329 stream_state->rs.trailing_metadata_valid = false;
1332 stream_op->payload->recv_trailing_metadata.recv_trailing_metadata_ready,
1334 stream_state->state_op_done[OP_RECV_TRAILING_METADATA] = true;
1335 result = ACTION_TAKEN_NO_CALLBACK;
1336 } else if (stream_op->cancel_stream &&
1337 op_can_be_run(stream_op, s, &oas->state, OP_CANCEL_ERROR)) {
1338 CRONET_LOG(GPR_DEBUG, "running: %p OP_CANCEL_ERROR", oas);
1340 CRONET_LOG(GPR_DEBUG, "W: bidirectional_stream_cancel(%p)", s->cbs);
1341 bidirectional_stream_cancel(s->cbs);
1342 result = ACTION_TAKEN_WITH_CALLBACK;
1344 result = ACTION_TAKEN_NO_CALLBACK;
1346 stream_state->state_op_done[OP_CANCEL_ERROR] = true;
1347 if (!stream_state->cancel_error) {
1348 stream_state->cancel_error =
1349 GRPC_ERROR_REF(stream_op->payload->cancel_stream.cancel_error);
1351 } else if (stream_op->on_complete &&
1352 op_can_be_run(stream_op, s, &oas->state, OP_ON_COMPLETE)) {
1353 CRONET_LOG(GPR_DEBUG, "running: %p OP_ON_COMPLETE", oas);
1354 if (stream_state->state_op_done[OP_CANCEL_ERROR]) {
1355 GRPC_CLOSURE_SCHED(stream_op->on_complete,
1356 GRPC_ERROR_REF(stream_state->cancel_error));
1357 } else if (stream_state->state_callback_received[OP_FAILED]) {
1359 stream_op->on_complete,
1360 make_error_with_desc(GRPC_STATUS_UNAVAILABLE, "Unavailable."));
1362 /* All actions in this stream_op are complete. Call the on_complete
1365 GRPC_CLOSURE_SCHED(stream_op->on_complete, GRPC_ERROR_NONE);
1367 oas->state.state_op_done[OP_ON_COMPLETE] = true;
1369 /* reset any send message state, only if this ON_COMPLETE is about a send.
1371 if (stream_op->send_message) {
1372 stream_state->state_callback_received[OP_SEND_MESSAGE] = false;
1373 stream_state->state_op_done[OP_SEND_MESSAGE] = false;
1375 result = ACTION_TAKEN_NO_CALLBACK;
1376 /* If this is the on_complete callback being called for a received message -
1378 if (stream_op->recv_message)
1379 stream_state->state_op_done[OP_RECV_MESSAGE_AND_ON_COMPLETE] = true;
1381 result = NO_ACTION_POSSIBLE;
1387 Functions used by upper layers to access transport functionality.
1390 inline stream_obj::stream_obj(grpc_transport* gt, grpc_stream* gs,
1391 grpc_stream_refcount* refcount,
1392 grpc_core::Arena* arena)
1394 curr_ct(reinterpret_cast<grpc_cronet_transport*>(gt)),
1397 refcount(refcount) {
1398 GRPC_CRONET_STREAM_REF(this, "cronet transport");
1402 inline stream_obj::~stream_obj() {
1403 null_and_maybe_free_read_buffer(this);
1404 /* Clean up read_slice_buffer in case there is unread data. */
1405 grpc_slice_buffer_destroy_internal(&state.rs.read_slice_buffer);
1406 GRPC_ERROR_UNREF(state.cancel_error);
1409 static int init_stream(grpc_transport* gt, grpc_stream* gs,
1410 grpc_stream_refcount* refcount, const void* server_data,
1411 grpc_core::Arena* arena) {
1412 new (gs) stream_obj(gt, gs, refcount, arena);
1416 static void set_pollset_do_nothing(grpc_transport* gt, grpc_stream* gs,
1417 grpc_pollset* pollset) {}
1419 static void set_pollset_set_do_nothing(grpc_transport* gt, grpc_stream* gs,
1420 grpc_pollset_set* pollset_set) {}
1422 static void perform_stream_op(grpc_transport* gt, grpc_stream* gs,
1423 grpc_transport_stream_op_batch* op) {
1424 CRONET_LOG(GPR_DEBUG, "perform_stream_op");
1425 if (op->send_initial_metadata &&
1426 header_has_authority(op->payload->send_initial_metadata
1427 .send_initial_metadata->list.head)) {
1428 /* Cronet does not support :authority header field. We cancel the call when
1429 this field is present in metadata */
1430 if (op->recv_initial_metadata) {
1432 op->payload->recv_initial_metadata.recv_initial_metadata_ready,
1433 GRPC_ERROR_CANCELLED);
1435 if (op->recv_message) {
1436 GRPC_CLOSURE_SCHED(op->payload->recv_message.recv_message_ready,
1437 GRPC_ERROR_CANCELLED);
1439 if (op->recv_trailing_metadata) {
1441 op->payload->recv_trailing_metadata.recv_trailing_metadata_ready,
1442 GRPC_ERROR_CANCELLED);
1444 GRPC_CLOSURE_SCHED(op->on_complete, GRPC_ERROR_CANCELLED);
1447 stream_obj* s = reinterpret_cast<stream_obj*>(gs);
1448 add_to_storage(s, op);
1449 execute_from_storage(s);
1452 static void destroy_stream(grpc_transport* gt, grpc_stream* gs,
1453 grpc_closure* then_schedule_closure) {
1454 stream_obj* s = reinterpret_cast<stream_obj*>(gs);
1456 GRPC_CLOSURE_SCHED(then_schedule_closure, GRPC_ERROR_NONE);
1459 static void destroy_transport(grpc_transport* gt) {}
1461 static grpc_endpoint* get_endpoint(grpc_transport* gt) { return nullptr; }
1463 static void perform_op(grpc_transport* gt, grpc_transport_op* op) {}
1465 static const grpc_transport_vtable grpc_cronet_vtable = {
1469 set_pollset_do_nothing,
1470 set_pollset_set_do_nothing,
1477 grpc_transport* grpc_create_cronet_transport(void* engine, const char* target,
1478 const grpc_channel_args* args,
1480 grpc_cronet_transport* ct = static_cast<grpc_cronet_transport*>(
1481 gpr_malloc(sizeof(grpc_cronet_transport)));
1485 ct->base.vtable = &grpc_cronet_vtable;
1486 ct->engine = static_cast<stream_engine*>(engine);
1487 ct->host = static_cast<char*>(gpr_malloc(strlen(target) + 1));
1491 strcpy(ct->host, target);
1493 ct->use_packet_coalescing = true;
1495 for (size_t i = 0; i < args->num_args; i++) {
1497 strcmp(args->args[i].key, GRPC_ARG_USE_CRONET_PACKET_COALESCING)) {
1498 if (GPR_UNLIKELY(args->args[i].type != GRPC_ARG_INTEGER)) {
1499 gpr_log(GPR_ERROR, "%s ignored: it must be an integer",
1500 GRPC_ARG_USE_CRONET_PACKET_COALESCING);
1502 ct->use_packet_coalescing = (args->args[i].value.integer != 0);