Built motion from commit 6a09e18b.|2.6.11
[motion2.git] / legacy-libs / grpc / deps / grpc / src / core / ext / transport / cronet / transport / cronet_transport.cc
1 /*
2  *
3  * Copyright 2016 gRPC authors.
4  *
5  * Licensed under the Apache License, Version 2.0 (the "License");
6  * you may not use this file except in compliance with the License.
7  * You may obtain a copy of the License at
8  *
9  *     http://www.apache.org/licenses/LICENSE-2.0
10  *
11  * Unless required by applicable law or agreed to in writing, software
12  * distributed under the License is distributed on an "AS IS" BASIS,
13  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14  * See the License for the specific language governing permissions and
15  * limitations under the License.
16  *
17  */
18
19 #include <grpc/support/port_platform.h>
20
21 #include <string.h>
22
23 #include <grpc/slice_buffer.h>
24 #include <grpc/support/alloc.h>
25 #include <grpc/support/log.h>
26 #include <grpc/support/string_util.h>
27
28 #include "src/core/ext/transport/chttp2/transport/bin_decoder.h"
29 #include "src/core/ext/transport/chttp2/transport/bin_encoder.h"
30 #include "src/core/ext/transport/chttp2/transport/incoming_metadata.h"
31 #include "src/core/ext/transport/cronet/transport/cronet_transport.h"
32 #include "src/core/lib/debug/trace.h"
33 #include "src/core/lib/gpr/string.h"
34 #include "src/core/lib/gprpp/manual_constructor.h"
35 #include "src/core/lib/iomgr/endpoint.h"
36 #include "src/core/lib/iomgr/exec_ctx.h"
37 #include "src/core/lib/slice/slice_internal.h"
38 #include "src/core/lib/slice/slice_string_helpers.h"
39 #include "src/core/lib/surface/channel.h"
40 #include "src/core/lib/surface/validate_metadata.h"
41 #include "src/core/lib/transport/metadata_batch.h"
42 #include "src/core/lib/transport/static_metadata.h"
43 #include "src/core/lib/transport/timeout_encoding.h"
44 #include "src/core/lib/transport/transport_impl.h"
45
46 #include "third_party/objective_c/Cronet/bidirectional_stream_c.h"
47
48 #define GRPC_HEADER_SIZE_IN_BYTES 5
49 #define GRPC_FLUSH_READ_SIZE 4096
50
51 grpc_core::TraceFlag grpc_cronet_trace(false, "cronet");
52 #define CRONET_LOG(...)                                    \
53   do {                                                     \
54     if (grpc_cronet_trace.enabled()) gpr_log(__VA_ARGS__); \
55   } while (0)
56
57 enum e_op_result {
58   ACTION_TAKEN_WITH_CALLBACK,
59   ACTION_TAKEN_NO_CALLBACK,
60   NO_ACTION_POSSIBLE
61 };
62
63 enum e_op_id {
64   OP_SEND_INITIAL_METADATA = 0,
65   OP_SEND_MESSAGE,
66   OP_SEND_TRAILING_METADATA,
67   OP_RECV_MESSAGE,
68   OP_RECV_INITIAL_METADATA,
69   OP_RECV_TRAILING_METADATA,
70   OP_CANCEL_ERROR,
71   OP_ON_COMPLETE,
72   OP_FAILED,
73   OP_SUCCEEDED,
74   OP_CANCELED,
75   OP_RECV_MESSAGE_AND_ON_COMPLETE,
76   OP_READ_REQ_MADE,
77   OP_NUM_OPS
78 };
79
80 /* Cronet callbacks. See cronet_c_for_grpc.h for documentation for each. */
81
82 static void on_stream_ready(bidirectional_stream*);
83 static void on_response_headers_received(
84     bidirectional_stream*, const bidirectional_stream_header_array*,
85     const char*);
86 static void on_write_completed(bidirectional_stream*, const char*);
87 static void on_read_completed(bidirectional_stream*, char*, int);
88 static void on_response_trailers_received(
89     bidirectional_stream*, const bidirectional_stream_header_array*);
90 static void on_succeeded(bidirectional_stream*);
91 static void on_failed(bidirectional_stream*, int);
92 static void on_canceled(bidirectional_stream*);
93 static bidirectional_stream_callback cronet_callbacks = {
94     on_stream_ready,
95     on_response_headers_received,
96     on_read_completed,
97     on_write_completed,
98     on_response_trailers_received,
99     on_succeeded,
100     on_failed,
101     on_canceled};
102
103 /* Cronet transport object */
104 struct grpc_cronet_transport {
105   grpc_transport base; /* must be first element in this structure */
106   stream_engine* engine;
107   char* host;
108   bool use_packet_coalescing;
109 };
110 typedef struct grpc_cronet_transport grpc_cronet_transport;
111
112 /* TODO (makdharma): reorder structure for memory efficiency per
113    http://www.catb.org/esr/structure-packing/#_structure_reordering: */
114 struct read_state {
115   read_state(grpc_core::Arena* arena)
116       : trailing_metadata(arena), initial_metadata(arena) {
117     grpc_slice_buffer_init(&read_slice_buffer);
118   }
119
120   /* vars to store data coming from server */
121   char* read_buffer = nullptr;
122   bool length_field_received = false;
123   int received_bytes = 0;
124   int remaining_bytes = 0;
125   int length_field = 0;
126   bool compressed = 0;
127   char grpc_header_bytes[GRPC_HEADER_SIZE_IN_BYTES] = {};
128   char* payload_field = nullptr;
129   bool read_stream_closed = 0;
130
131   /* vars for holding data destined for the application */
132   grpc_core::ManualConstructor<grpc_core::SliceBufferByteStream> sbs;
133   grpc_slice_buffer read_slice_buffer;
134
135   /* vars for trailing metadata */
136   grpc_chttp2_incoming_metadata_buffer trailing_metadata;
137   bool trailing_metadata_valid = false;
138
139   /* vars for initial metadata */
140   grpc_chttp2_incoming_metadata_buffer initial_metadata;
141 };
142
143 struct write_state {
144   char* write_buffer = nullptr;
145 };
146
147 /* track state of one stream op */
148 struct op_state {
149   op_state(grpc_core::Arena* arena) : rs(arena) {}
150
151   bool state_op_done[OP_NUM_OPS] = {};
152   bool state_callback_received[OP_NUM_OPS] = {};
153   /* A non-zero gRPC status code has been seen */
154   bool fail_state = false;
155   /* Transport is discarding all buffered messages */
156   bool flush_read = false;
157   bool flush_cronet_when_ready = false;
158   bool pending_write_for_trailer = false;
159   bool pending_send_message = false;
160   /* User requested RECV_TRAILING_METADATA */
161   bool pending_recv_trailing_metadata = false;
162   /* Cronet has not issued a callback of a bidirectional read */
163   bool pending_read_from_cronet = false;
164   grpc_error* cancel_error = GRPC_ERROR_NONE;
165   /* data structure for storing data coming from server */
166   struct read_state rs;
167   /* data structure for storing data going to the server */
168   struct write_state ws;
169 };
170
171 struct stream_obj;
172
173 struct op_and_state {
174   op_and_state(stream_obj* s, const grpc_transport_stream_op_batch& op);
175
176   grpc_transport_stream_op_batch op;
177   struct op_state state;
178   bool done = false;
179   struct stream_obj* s; /* Pointer back to the stream object */
180   /* next op_and_state in the linked list */
181   struct op_and_state* next = nullptr;
182 };
183
184 struct op_storage {
185   int num_pending_ops = 0;
186   struct op_and_state* head = nullptr;
187 };
188
189 struct stream_obj {
190   stream_obj(grpc_transport* gt, grpc_stream* gs,
191              grpc_stream_refcount* refcount, grpc_core::Arena* arena);
192   ~stream_obj();
193
194   grpc_core::Arena* arena;
195   struct op_and_state* oas = nullptr;
196   grpc_transport_stream_op_batch* curr_op = nullptr;
197   grpc_cronet_transport* curr_ct;
198   grpc_stream* curr_gs;
199   bidirectional_stream* cbs = nullptr;
200   bidirectional_stream_header_array header_array =
201       bidirectional_stream_header_array();  // Zero-initialize the structure.
202
203   /* Stream level state. Some state will be tracked both at stream and stream_op
204    * level */
205   struct op_state state;
206
207   /* OP storage */
208   struct op_storage storage;
209
210   /* Mutex to protect storage */
211   gpr_mu mu;
212
213   /* Refcount object of the stream */
214   grpc_stream_refcount* refcount;
215 };
216
217 #ifndef NDEBUG
218 #define GRPC_CRONET_STREAM_REF(stream, reason) \
219   grpc_cronet_stream_ref((stream), (reason))
220 #define GRPC_CRONET_STREAM_UNREF(stream, reason) \
221   grpc_cronet_stream_unref((stream), (reason))
222 void grpc_cronet_stream_ref(stream_obj* s, const char* reason) {
223   grpc_stream_ref(s->refcount, reason);
224 }
225 void grpc_cronet_stream_unref(stream_obj* s, const char* reason) {
226   grpc_stream_unref(s->refcount, reason);
227 }
228 #else
229 #define GRPC_CRONET_STREAM_REF(stream, reason) grpc_cronet_stream_ref((stream))
230 #define GRPC_CRONET_STREAM_UNREF(stream, reason) \
231   grpc_cronet_stream_unref((stream))
232 void grpc_cronet_stream_ref(stream_obj* s) { grpc_stream_ref(s->refcount); }
233 void grpc_cronet_stream_unref(stream_obj* s) { grpc_stream_unref(s->refcount); }
234 #endif
235
236 static enum e_op_result execute_stream_op(struct op_and_state* oas);
237
238 /*
239   Utility function to translate enum into string for printing
240 */
241 static const char* op_result_string(enum e_op_result i) {
242   switch (i) {
243     case ACTION_TAKEN_WITH_CALLBACK:
244       return "ACTION_TAKEN_WITH_CALLBACK";
245     case ACTION_TAKEN_NO_CALLBACK:
246       return "ACTION_TAKEN_NO_CALLBACK";
247     case NO_ACTION_POSSIBLE:
248       return "NO_ACTION_POSSIBLE";
249   }
250   GPR_UNREACHABLE_CODE(return "UNKNOWN");
251 }
252
253 static const char* op_id_string(enum e_op_id i) {
254   switch (i) {
255     case OP_SEND_INITIAL_METADATA:
256       return "OP_SEND_INITIAL_METADATA";
257     case OP_SEND_MESSAGE:
258       return "OP_SEND_MESSAGE";
259     case OP_SEND_TRAILING_METADATA:
260       return "OP_SEND_TRAILING_METADATA";
261     case OP_RECV_MESSAGE:
262       return "OP_RECV_MESSAGE";
263     case OP_RECV_INITIAL_METADATA:
264       return "OP_RECV_INITIAL_METADATA";
265     case OP_RECV_TRAILING_METADATA:
266       return "OP_RECV_TRAILING_METADATA";
267     case OP_CANCEL_ERROR:
268       return "OP_CANCEL_ERROR";
269     case OP_ON_COMPLETE:
270       return "OP_ON_COMPLETE";
271     case OP_FAILED:
272       return "OP_FAILED";
273     case OP_SUCCEEDED:
274       return "OP_SUCCEEDED";
275     case OP_CANCELED:
276       return "OP_CANCELED";
277     case OP_RECV_MESSAGE_AND_ON_COMPLETE:
278       return "OP_RECV_MESSAGE_AND_ON_COMPLETE";
279     case OP_READ_REQ_MADE:
280       return "OP_READ_REQ_MADE";
281     case OP_NUM_OPS:
282       return "OP_NUM_OPS";
283   }
284   return "UNKNOWN";
285 }
286
287 static void null_and_maybe_free_read_buffer(stream_obj* s) {
288   if (s->state.rs.read_buffer &&
289       s->state.rs.read_buffer != s->state.rs.grpc_header_bytes) {
290     gpr_free(s->state.rs.read_buffer);
291   }
292   s->state.rs.read_buffer = nullptr;
293 }
294
295 static void maybe_flush_read(stream_obj* s) {
296   /* To enter flush read state (discarding all the buffered messages in
297    * transport layer), two conditions must be satisfied: 1) non-zero grpc status
298    * has been received, and 2) an op requesting the status code
299    * (RECV_TRAILING_METADATA) is issued by the user. (See
300    * doc/status_ordering.md) */
301   /* Whenever the evaluation of any of the two condition is changed, we check
302    * whether we should enter the flush read state. */
303   if (s->state.pending_recv_trailing_metadata && s->state.fail_state) {
304     if (!s->state.flush_read && !s->state.rs.read_stream_closed) {
305       CRONET_LOG(GPR_DEBUG, "%p: Flush read", s);
306       s->state.flush_read = true;
307       null_and_maybe_free_read_buffer(s);
308       s->state.rs.read_buffer =
309           static_cast<char*>(gpr_malloc(GRPC_FLUSH_READ_SIZE));
310       if (!s->state.pending_read_from_cronet) {
311         CRONET_LOG(GPR_DEBUG, "bidirectional_stream_read(%p)", s->cbs);
312         bidirectional_stream_read(s->cbs, s->state.rs.read_buffer,
313                                   GRPC_FLUSH_READ_SIZE);
314         s->state.pending_read_from_cronet = true;
315       }
316     }
317   }
318 }
319
320 static grpc_error* make_error_with_desc(int error_code, const char* desc) {
321   grpc_error* error = GRPC_ERROR_CREATE_FROM_COPIED_STRING(desc);
322   error = grpc_error_set_int(error, GRPC_ERROR_INT_GRPC_STATUS, error_code);
323   return error;
324 }
325
326 inline op_and_state::op_and_state(stream_obj* s,
327                                   const grpc_transport_stream_op_batch& op)
328     : op(op), state(s->arena), s(s) {}
329
330 /*
331   Add a new stream op to op storage.
332 */
333 static void add_to_storage(struct stream_obj* s,
334                            grpc_transport_stream_op_batch* op) {
335   struct op_storage* storage = &s->storage;
336   /* add new op at the beginning of the linked list. The memory is freed
337   in remove_from_storage */
338   op_and_state* new_op = grpc_core::New<op_and_state>(s, *op);
339   gpr_mu_lock(&s->mu);
340   new_op->next = storage->head;
341   storage->head = new_op;
342   storage->num_pending_ops++;
343   if (op->send_message) {
344     s->state.pending_send_message = true;
345   }
346   if (op->recv_trailing_metadata) {
347     s->state.pending_recv_trailing_metadata = true;
348     maybe_flush_read(s);
349   }
350   CRONET_LOG(GPR_DEBUG, "adding new op %p. %d in the queue.", new_op,
351              storage->num_pending_ops);
352   gpr_mu_unlock(&s->mu);
353 }
354
355 /*
356   Traverse the linked list and delete op and free memory
357 */
358 static void remove_from_storage(struct stream_obj* s,
359                                 struct op_and_state* oas) {
360   struct op_and_state* curr;
361   if (s->storage.head == nullptr || oas == nullptr) {
362     return;
363   }
364   if (s->storage.head == oas) {
365     s->storage.head = oas->next;
366     grpc_core::Delete(oas);
367     s->storage.num_pending_ops--;
368     CRONET_LOG(GPR_DEBUG, "Freed %p. Now %d in the queue", oas,
369                s->storage.num_pending_ops);
370   } else {
371     for (curr = s->storage.head; curr != nullptr; curr = curr->next) {
372       if (curr->next == oas) {
373         curr->next = oas->next;
374         s->storage.num_pending_ops--;
375         CRONET_LOG(GPR_DEBUG, "Freed %p. Now %d in the queue", oas,
376                    s->storage.num_pending_ops);
377         grpc_core::Delete(oas);
378         break;
379       } else if (GPR_UNLIKELY(curr->next == nullptr)) {
380         CRONET_LOG(GPR_ERROR, "Reached end of LL and did not find op to free");
381       }
382     }
383   }
384 }
385
386 /*
387   Cycle through ops and try to take next action. Break when either
388   an action with callback is taken, or no action is possible.
389   This can get executed from the Cronet network thread via cronet callback
390   or on the application supplied thread via the perform_stream_op function.
391 */
392 static void execute_from_storage(stream_obj* s) {
393   gpr_mu_lock(&s->mu);
394   for (struct op_and_state* curr = s->storage.head; curr != nullptr;) {
395     CRONET_LOG(GPR_DEBUG, "calling op at %p. done = %d", curr, curr->done);
396     GPR_ASSERT(!curr->done);
397     enum e_op_result result = execute_stream_op(curr);
398     CRONET_LOG(GPR_DEBUG, "execute_stream_op[%p] returns %s", curr,
399                op_result_string(result));
400     /* if this op is done, then remove it and free memory */
401     if (curr->done) {
402       struct op_and_state* next = curr->next;
403       remove_from_storage(s, curr);
404       curr = next;
405     } else if (result == NO_ACTION_POSSIBLE) {
406       curr = curr->next;
407     } else if (result == ACTION_TAKEN_WITH_CALLBACK) {
408       /* wait for the callback */
409       break;
410     } /* continue processing the same op if ACTION_TAKEN_WITHOUT_CALLBACK */
411   }
412   gpr_mu_unlock(&s->mu);
413 }
414
415 static void convert_cronet_array_to_metadata(
416     const bidirectional_stream_header_array* header_array,
417     grpc_chttp2_incoming_metadata_buffer* mds) {
418   for (size_t i = 0; i < header_array->count; i++) {
419     CRONET_LOG(GPR_DEBUG, "header key=%s, value=%s",
420                header_array->headers[i].key, header_array->headers[i].value);
421     grpc_slice key = grpc_slice_intern(
422         grpc_slice_from_static_string(header_array->headers[i].key));
423     grpc_slice value;
424     if (grpc_is_refcounted_slice_binary_header(key)) {
425       value = grpc_slice_from_static_string(header_array->headers[i].value);
426       value = grpc_slice_intern(grpc_chttp2_base64_decode_with_length(
427           value, grpc_chttp2_base64_infer_length_after_decode(value)));
428     } else {
429       value = grpc_slice_intern(
430           grpc_slice_from_static_string(header_array->headers[i].value));
431     }
432     GRPC_LOG_IF_ERROR("convert_cronet_array_to_metadata",
433                       grpc_chttp2_incoming_metadata_buffer_add(
434                           mds, grpc_mdelem_from_slices(key, value)));
435   }
436 }
437
438 /*
439   Cronet callback
440 */
441 static void on_failed(bidirectional_stream* stream, int net_error) {
442   gpr_log(GPR_ERROR, "on_failed(%p, %d)", stream, net_error);
443   grpc_core::ApplicationCallbackExecCtx callback_exec_ctx;
444   grpc_core::ExecCtx exec_ctx;
445
446   stream_obj* s = static_cast<stream_obj*>(stream->annotation);
447   gpr_mu_lock(&s->mu);
448   bidirectional_stream_destroy(s->cbs);
449   s->state.state_callback_received[OP_FAILED] = true;
450   s->cbs = nullptr;
451   if (s->header_array.headers) {
452     gpr_free(s->header_array.headers);
453     s->header_array.headers = nullptr;
454   }
455   if (s->state.ws.write_buffer) {
456     gpr_free(s->state.ws.write_buffer);
457     s->state.ws.write_buffer = nullptr;
458   }
459   null_and_maybe_free_read_buffer(s);
460   gpr_mu_unlock(&s->mu);
461   execute_from_storage(s);
462   GRPC_CRONET_STREAM_UNREF(s, "cronet transport");
463 }
464
465 /*
466   Cronet callback
467 */
468 static void on_canceled(bidirectional_stream* stream) {
469   CRONET_LOG(GPR_DEBUG, "on_canceled(%p)", stream);
470   grpc_core::ApplicationCallbackExecCtx callback_exec_ctx;
471   grpc_core::ExecCtx exec_ctx;
472
473   stream_obj* s = static_cast<stream_obj*>(stream->annotation);
474   gpr_mu_lock(&s->mu);
475   bidirectional_stream_destroy(s->cbs);
476   s->state.state_callback_received[OP_CANCELED] = true;
477   s->cbs = nullptr;
478   if (s->header_array.headers) {
479     gpr_free(s->header_array.headers);
480     s->header_array.headers = nullptr;
481   }
482   if (s->state.ws.write_buffer) {
483     gpr_free(s->state.ws.write_buffer);
484     s->state.ws.write_buffer = nullptr;
485   }
486   null_and_maybe_free_read_buffer(s);
487   gpr_mu_unlock(&s->mu);
488   execute_from_storage(s);
489   GRPC_CRONET_STREAM_UNREF(s, "cronet transport");
490 }
491
492 /*
493   Cronet callback
494 */
495 static void on_succeeded(bidirectional_stream* stream) {
496   CRONET_LOG(GPR_DEBUG, "on_succeeded(%p)", stream);
497   grpc_core::ApplicationCallbackExecCtx callback_exec_ctx;
498   grpc_core::ExecCtx exec_ctx;
499
500   stream_obj* s = static_cast<stream_obj*>(stream->annotation);
501   gpr_mu_lock(&s->mu);
502   bidirectional_stream_destroy(s->cbs);
503   s->state.state_callback_received[OP_SUCCEEDED] = true;
504   s->cbs = nullptr;
505   null_and_maybe_free_read_buffer(s);
506   gpr_mu_unlock(&s->mu);
507   execute_from_storage(s);
508   GRPC_CRONET_STREAM_UNREF(s, "cronet transport");
509 }
510
511 /*
512   Cronet callback
513 */
514 static void on_stream_ready(bidirectional_stream* stream) {
515   CRONET_LOG(GPR_DEBUG, "W: on_stream_ready(%p)", stream);
516   grpc_core::ApplicationCallbackExecCtx callback_exec_ctx;
517   grpc_core::ExecCtx exec_ctx;
518   stream_obj* s = static_cast<stream_obj*>(stream->annotation);
519   grpc_cronet_transport* t = s->curr_ct;
520   gpr_mu_lock(&s->mu);
521   s->state.state_op_done[OP_SEND_INITIAL_METADATA] = true;
522   s->state.state_callback_received[OP_SEND_INITIAL_METADATA] = true;
523   /* Free the memory allocated for headers */
524   if (s->header_array.headers) {
525     gpr_free(s->header_array.headers);
526     s->header_array.headers = nullptr;
527   }
528   /* Send the initial metadata on wire if there is no SEND_MESSAGE or
529    * SEND_TRAILING_METADATA ops pending */
530   if (t->use_packet_coalescing) {
531     if (s->state.flush_cronet_when_ready) {
532       CRONET_LOG(GPR_DEBUG, "cronet_bidirectional_stream_flush (%p)", s->cbs);
533       bidirectional_stream_flush(stream);
534     }
535   }
536   gpr_mu_unlock(&s->mu);
537   execute_from_storage(s);
538 }
539
540 /*
541   Cronet callback
542 */
543 static void on_response_headers_received(
544     bidirectional_stream* stream,
545     const bidirectional_stream_header_array* headers,
546     const char* negotiated_protocol) {
547   grpc_core::ApplicationCallbackExecCtx callback_exec_ctx;
548   grpc_core::ExecCtx exec_ctx;
549   CRONET_LOG(GPR_DEBUG, "R: on_response_headers_received(%p, %p, %s)", stream,
550              headers, negotiated_protocol);
551   stream_obj* s = static_cast<stream_obj*>(stream->annotation);
552
553   /* Identify if this is a header or a trailer (in a trailer-only response case)
554    */
555   for (size_t i = 0; i < headers->count; i++) {
556     if (0 == strcmp("grpc-status", headers->headers[i].key)) {
557       on_response_trailers_received(stream, headers);
558       return;
559     }
560   }
561
562   gpr_mu_lock(&s->mu);
563   convert_cronet_array_to_metadata(headers, &s->state.rs.initial_metadata);
564   s->state.state_callback_received[OP_RECV_INITIAL_METADATA] = true;
565   if (!(s->state.state_op_done[OP_CANCEL_ERROR] ||
566         s->state.state_callback_received[OP_FAILED])) {
567     /* Do an extra read to trigger on_succeeded() callback in case connection
568      is closed */
569     GPR_ASSERT(s->state.rs.length_field_received == false);
570     s->state.rs.read_buffer = s->state.rs.grpc_header_bytes;
571     s->state.rs.compressed = false;
572     s->state.rs.received_bytes = 0;
573     s->state.rs.remaining_bytes = GRPC_HEADER_SIZE_IN_BYTES;
574     CRONET_LOG(GPR_DEBUG, "bidirectional_stream_read(%p)", s->cbs);
575     bidirectional_stream_read(s->cbs, s->state.rs.read_buffer,
576                               s->state.rs.remaining_bytes);
577     s->state.pending_read_from_cronet = true;
578   }
579   gpr_mu_unlock(&s->mu);
580   execute_from_storage(s);
581 }
582
583 /*
584   Cronet callback
585 */
586 static void on_write_completed(bidirectional_stream* stream, const char* data) {
587   grpc_core::ApplicationCallbackExecCtx callback_exec_ctx;
588   grpc_core::ExecCtx exec_ctx;
589   stream_obj* s = static_cast<stream_obj*>(stream->annotation);
590   CRONET_LOG(GPR_DEBUG, "W: on_write_completed(%p, %s)", stream, data);
591   gpr_mu_lock(&s->mu);
592   if (s->state.ws.write_buffer) {
593     gpr_free(s->state.ws.write_buffer);
594     s->state.ws.write_buffer = nullptr;
595   }
596   s->state.state_callback_received[OP_SEND_MESSAGE] = true;
597   gpr_mu_unlock(&s->mu);
598   execute_from_storage(s);
599 }
600
601 /*
602   Cronet callback
603 */
604 static void on_read_completed(bidirectional_stream* stream, char* data,
605                               int count) {
606   grpc_core::ApplicationCallbackExecCtx callback_exec_ctx;
607   grpc_core::ExecCtx exec_ctx;
608   stream_obj* s = static_cast<stream_obj*>(stream->annotation);
609   CRONET_LOG(GPR_DEBUG, "R: on_read_completed(%p, %p, %d)", stream, data,
610              count);
611   gpr_mu_lock(&s->mu);
612   s->state.pending_read_from_cronet = false;
613   s->state.state_callback_received[OP_RECV_MESSAGE] = true;
614   if (count > 0 && s->state.flush_read) {
615     CRONET_LOG(GPR_DEBUG, "bidirectional_stream_read(%p)", s->cbs);
616     bidirectional_stream_read(s->cbs, s->state.rs.read_buffer,
617                               GRPC_FLUSH_READ_SIZE);
618     s->state.pending_read_from_cronet = true;
619     gpr_mu_unlock(&s->mu);
620   } else if (count > 0) {
621     s->state.rs.received_bytes += count;
622     s->state.rs.remaining_bytes -= count;
623     if (s->state.rs.remaining_bytes > 0) {
624       CRONET_LOG(GPR_DEBUG, "bidirectional_stream_read(%p)", s->cbs);
625       s->state.state_op_done[OP_READ_REQ_MADE] = true;
626       bidirectional_stream_read(
627           s->cbs, s->state.rs.read_buffer + s->state.rs.received_bytes,
628           s->state.rs.remaining_bytes);
629       s->state.pending_read_from_cronet = true;
630       gpr_mu_unlock(&s->mu);
631     } else {
632       gpr_mu_unlock(&s->mu);
633       execute_from_storage(s);
634     }
635   } else {
636     null_and_maybe_free_read_buffer(s);
637     s->state.rs.read_stream_closed = true;
638     gpr_mu_unlock(&s->mu);
639     execute_from_storage(s);
640   }
641 }
642
643 /*
644   Cronet callback
645 */
646 static void on_response_trailers_received(
647     bidirectional_stream* stream,
648     const bidirectional_stream_header_array* trailers) {
649   grpc_core::ApplicationCallbackExecCtx callback_exec_ctx;
650   grpc_core::ExecCtx exec_ctx;
651   CRONET_LOG(GPR_DEBUG, "R: on_response_trailers_received(%p,%p)", stream,
652              trailers);
653   stream_obj* s = static_cast<stream_obj*>(stream->annotation);
654   grpc_cronet_transport* t = s->curr_ct;
655   gpr_mu_lock(&s->mu);
656   s->state.rs.trailing_metadata_valid = false;
657   convert_cronet_array_to_metadata(trailers, &s->state.rs.trailing_metadata);
658   if (trailers->count > 0) {
659     s->state.rs.trailing_metadata_valid = true;
660   }
661   for (size_t i = 0; i < trailers->count; i++) {
662     if (0 == strcmp(trailers->headers[i].key, "grpc-status") &&
663         0 != strcmp(trailers->headers[i].value, "0")) {
664       s->state.fail_state = true;
665       maybe_flush_read(s);
666     }
667   }
668   s->state.state_callback_received[OP_RECV_TRAILING_METADATA] = true;
669   /* Send a EOS when server terminates the stream (testServerFinishesRequest) to
670    * trigger on_succeeded */
671   if (!s->state.state_op_done[OP_SEND_TRAILING_METADATA] &&
672       !(s->state.state_op_done[OP_CANCEL_ERROR] ||
673         s->state.state_callback_received[OP_FAILED])) {
674     CRONET_LOG(GPR_DEBUG, "bidirectional_stream_write (%p, 0)", s->cbs);
675     s->state.state_callback_received[OP_SEND_MESSAGE] = false;
676     bidirectional_stream_write(s->cbs, "", 0, true);
677     if (t->use_packet_coalescing) {
678       CRONET_LOG(GPR_DEBUG, "bidirectional_stream_flush (%p)", s->cbs);
679       bidirectional_stream_flush(s->cbs);
680     }
681     s->state.state_op_done[OP_SEND_TRAILING_METADATA] = true;
682
683     gpr_mu_unlock(&s->mu);
684   } else {
685     gpr_mu_unlock(&s->mu);
686     execute_from_storage(s);
687   }
688 }
689
690 /*
691  Utility function that takes the data from s->write_slice_buffer and assembles
692  into a contiguous byte stream with 5 byte gRPC header prepended.
693 */
694 static void create_grpc_frame(grpc_slice_buffer* write_slice_buffer,
695                               char** pp_write_buffer,
696                               size_t* p_write_buffer_size, uint32_t flags) {
697   grpc_slice slice = grpc_slice_buffer_take_first(write_slice_buffer);
698   size_t length = GRPC_SLICE_LENGTH(slice);
699   *p_write_buffer_size = length + GRPC_HEADER_SIZE_IN_BYTES;
700   /* This is freed in the on_write_completed callback */
701   char* write_buffer =
702       static_cast<char*>(gpr_malloc(length + GRPC_HEADER_SIZE_IN_BYTES));
703   *pp_write_buffer = write_buffer;
704   uint8_t* p = reinterpret_cast<uint8_t*>(write_buffer);
705   /* Append 5 byte header */
706   /* Compressed flag */
707   *p++ = static_cast<uint8_t>((flags & GRPC_WRITE_INTERNAL_COMPRESS) ? 1 : 0);
708   /* Message length */
709   *p++ = static_cast<uint8_t>(length >> 24);
710   *p++ = static_cast<uint8_t>(length >> 16);
711   *p++ = static_cast<uint8_t>(length >> 8);
712   *p++ = static_cast<uint8_t>(length);
713   /* append actual data */
714   memcpy(p, GRPC_SLICE_START_PTR(slice), length);
715   grpc_slice_unref_internal(slice);
716 }
717
718 /*
719  Convert metadata in a format that Cronet can consume
720 */
721 static void convert_metadata_to_cronet_headers(
722     grpc_metadata_batch* metadata, const char* host, char** pp_url,
723     bidirectional_stream_header** pp_headers, size_t* p_num_headers,
724     const char** method) {
725   grpc_linked_mdelem* curr = metadata->list.head;
726   /* Walk the linked list and get number of header fields */
727   size_t num_headers_available = 0;
728   while (curr != nullptr) {
729     curr = curr->next;
730     num_headers_available++;
731   }
732   grpc_millis deadline = metadata->deadline;
733   if (deadline != GRPC_MILLIS_INF_FUTURE) {
734     num_headers_available++;
735   }
736   /* Allocate enough memory. It is freed in the on_stream_ready callback
737    */
738   bidirectional_stream_header* headers =
739       static_cast<bidirectional_stream_header*>(gpr_malloc(
740           sizeof(bidirectional_stream_header) * num_headers_available));
741   *pp_headers = headers;
742
743   /* Walk the linked list again, this time copying the header fields.
744     s->num_headers can be less than num_headers_available, as some headers
745     are not used for cronet.
746     TODO (makdharma): Eliminate need to traverse the LL second time for perf.
747    */
748   curr = metadata->list.head;
749   size_t num_headers = 0;
750   while (num_headers < num_headers_available) {
751     grpc_mdelem mdelem = curr->md;
752     curr = curr->next;
753     char* key = grpc_slice_to_c_string(GRPC_MDKEY(mdelem));
754     char* value;
755     if (grpc_is_binary_header_internal(GRPC_MDKEY(mdelem))) {
756       grpc_slice wire_value = grpc_chttp2_base64_encode(GRPC_MDVALUE(mdelem));
757       value = grpc_slice_to_c_string(wire_value);
758       grpc_slice_unref_internal(wire_value);
759     } else {
760       value = grpc_slice_to_c_string(GRPC_MDVALUE(mdelem));
761     }
762     if (grpc_slice_eq_static_interned(GRPC_MDKEY(mdelem), GRPC_MDSTR_SCHEME) ||
763         grpc_slice_eq_static_interned(GRPC_MDKEY(mdelem),
764                                       GRPC_MDSTR_AUTHORITY)) {
765       /* Cronet populates these fields on its own */
766       gpr_free(key);
767       gpr_free(value);
768       continue;
769     }
770     if (grpc_slice_eq_static_interned(GRPC_MDKEY(mdelem), GRPC_MDSTR_METHOD)) {
771       if (grpc_slice_eq_static_interned(GRPC_MDVALUE(mdelem), GRPC_MDSTR_PUT)) {
772         *method = "PUT";
773       } else {
774         /* POST method in default*/
775         *method = "POST";
776       }
777       gpr_free(key);
778       gpr_free(value);
779       continue;
780     }
781     if (grpc_slice_eq_static_interned(GRPC_MDKEY(mdelem), GRPC_MDSTR_PATH)) {
782       /* Create URL by appending :path value to the hostname */
783       gpr_asprintf(pp_url, "https://%s%s", host, value);
784       gpr_free(key);
785       gpr_free(value);
786       continue;
787     }
788     CRONET_LOG(GPR_DEBUG, "header %s = %s", key, value);
789     headers[num_headers].key = key;
790     headers[num_headers].value = value;
791     num_headers++;
792     if (curr == nullptr) {
793       break;
794     }
795   }
796   if (deadline != GRPC_MILLIS_INF_FUTURE) {
797     char* key = grpc_slice_to_c_string(GRPC_MDSTR_GRPC_TIMEOUT);
798     char* value =
799         static_cast<char*>(gpr_malloc(GRPC_HTTP2_TIMEOUT_ENCODE_MIN_BUFSIZE));
800     grpc_http2_encode_timeout(deadline - grpc_core::ExecCtx::Get()->Now(),
801                               value);
802     headers[num_headers].key = key;
803     headers[num_headers].value = value;
804
805     num_headers++;
806   }
807
808   *p_num_headers = num_headers;
809 }
810
811 static void parse_grpc_header(const uint8_t* data, int* length,
812                               bool* compressed) {
813   const uint8_t c = *data;
814   const uint8_t* p = data + 1;
815   *compressed = ((c & 0x01) == 0x01);
816   *length = 0;
817   *length |= (*p++) << 24;
818   *length |= (*p++) << 16;
819   *length |= (*p++) << 8;
820   *length |= (*p++);
821 }
822
823 static bool header_has_authority(grpc_linked_mdelem* head) {
824   while (head != nullptr) {
825     if (grpc_slice_eq_static_interned(GRPC_MDKEY(head->md),
826                                       GRPC_MDSTR_AUTHORITY)) {
827       return true;
828     }
829     head = head->next;
830   }
831   return false;
832 }
833
834 /*
835   Op Execution: Decide if one of the actions contained in the stream op can be
836   executed. This is the heart of the state machine.
837 */
838 static bool op_can_be_run(grpc_transport_stream_op_batch* curr_op,
839                           struct stream_obj* s, struct op_state* op_state,
840                           enum e_op_id op_id) {
841   struct op_state* stream_state = &s->state;
842   grpc_cronet_transport* t = s->curr_ct;
843   bool result = true;
844   /* When call is canceled, every op can be run, except under following
845   conditions
846   */
847   bool is_canceled_or_failed = stream_state->state_op_done[OP_CANCEL_ERROR] ||
848                                stream_state->state_callback_received[OP_FAILED];
849   if (is_canceled_or_failed) {
850     if (op_id == OP_SEND_INITIAL_METADATA) {
851       CRONET_LOG(GPR_DEBUG, "Because");
852       result = false;
853     }
854     if (op_id == OP_SEND_MESSAGE) {
855       CRONET_LOG(GPR_DEBUG, "Because");
856       result = false;
857     }
858     if (op_id == OP_SEND_TRAILING_METADATA) {
859       CRONET_LOG(GPR_DEBUG, "Because");
860       result = false;
861     }
862     if (op_id == OP_CANCEL_ERROR) {
863       CRONET_LOG(GPR_DEBUG, "Because");
864       result = false;
865     }
866     /* already executed */
867     if (op_id == OP_RECV_INITIAL_METADATA &&
868         stream_state->state_op_done[OP_RECV_INITIAL_METADATA]) {
869       CRONET_LOG(GPR_DEBUG, "Because");
870       result = false;
871     }
872     if (op_id == OP_RECV_MESSAGE && op_state->state_op_done[OP_RECV_MESSAGE]) {
873       CRONET_LOG(GPR_DEBUG, "Because");
874       result = false;
875     }
876     if (op_id == OP_RECV_TRAILING_METADATA &&
877         stream_state->state_op_done[OP_RECV_TRAILING_METADATA]) {
878       CRONET_LOG(GPR_DEBUG, "Because");
879       result = false;
880     }
881     /* ON_COMPLETE can be processed if one of the following conditions is met:
882      * 1. the stream failed
883      * 2. the stream is cancelled, and the callback is received
884      * 3. the stream succeeded before cancel is effective
885      * 4. the stream is cancelled, and the stream is never started */
886     if (op_id == OP_ON_COMPLETE &&
887         !(stream_state->state_callback_received[OP_FAILED] ||
888           stream_state->state_callback_received[OP_CANCELED] ||
889           stream_state->state_callback_received[OP_SUCCEEDED] ||
890           !stream_state->state_op_done[OP_SEND_INITIAL_METADATA])) {
891       CRONET_LOG(GPR_DEBUG, "Because");
892       result = false;
893     }
894   } else if (op_id == OP_SEND_INITIAL_METADATA) {
895     /* already executed */
896     if (stream_state->state_op_done[OP_SEND_INITIAL_METADATA]) result = false;
897   } else if (op_id == OP_RECV_INITIAL_METADATA) {
898     /* already executed */
899     if (stream_state->state_op_done[OP_RECV_INITIAL_METADATA]) result = false;
900     /* we haven't sent headers yet. */
901     else if (!stream_state->state_callback_received[OP_SEND_INITIAL_METADATA])
902       result = false;
903     /* we haven't received headers yet. */
904     else if (!stream_state->state_callback_received[OP_RECV_INITIAL_METADATA] &&
905              !stream_state->state_op_done[OP_RECV_TRAILING_METADATA])
906       result = false;
907   } else if (op_id == OP_SEND_MESSAGE) {
908     /* already executed (note we're checking op specific state, not stream
909      state) */
910     if (op_state->state_op_done[OP_SEND_MESSAGE]) result = false;
911     /* we haven't sent headers yet. */
912     else if (!stream_state->state_callback_received[OP_SEND_INITIAL_METADATA])
913       result = false;
914   } else if (op_id == OP_RECV_MESSAGE) {
915     /* already executed */
916     if (op_state->state_op_done[OP_RECV_MESSAGE]) result = false;
917     /* we haven't received headers yet. */
918     else if (!stream_state->state_callback_received[OP_RECV_INITIAL_METADATA] &&
919              !stream_state->state_op_done[OP_RECV_TRAILING_METADATA])
920       result = false;
921   } else if (op_id == OP_RECV_TRAILING_METADATA) {
922     /* already executed */
923     if (stream_state->state_op_done[OP_RECV_TRAILING_METADATA]) result = false;
924     /* we have asked for but haven't received message yet. */
925     else if (stream_state->state_op_done[OP_READ_REQ_MADE] &&
926              !stream_state->state_op_done[OP_RECV_MESSAGE])
927       result = false;
928     /* we haven't received trailers  yet. */
929     else if (!stream_state->state_callback_received[OP_RECV_TRAILING_METADATA])
930       result = false;
931     /* we haven't received on_succeeded  yet. */
932     else if (!stream_state->state_callback_received[OP_SUCCEEDED])
933       result = false;
934   } else if (op_id == OP_SEND_TRAILING_METADATA) {
935     /* already executed */
936     if (stream_state->state_op_done[OP_SEND_TRAILING_METADATA]) result = false;
937     /* we haven't sent initial metadata yet */
938     else if (!stream_state->state_callback_received[OP_SEND_INITIAL_METADATA])
939       result = false;
940     /* we haven't sent message yet */
941     else if (stream_state->pending_send_message &&
942              !stream_state->state_op_done[OP_SEND_MESSAGE])
943       result = false;
944     /* we haven't got on_write_completed for the send yet */
945     else if (stream_state->state_op_done[OP_SEND_MESSAGE] &&
946              !stream_state->state_callback_received[OP_SEND_MESSAGE] &&
947              !(t->use_packet_coalescing &&
948                stream_state->pending_write_for_trailer))
949       result = false;
950   } else if (op_id == OP_CANCEL_ERROR) {
951     /* already executed */
952     if (stream_state->state_op_done[OP_CANCEL_ERROR]) result = false;
953   } else if (op_id == OP_ON_COMPLETE) {
954     /* already executed (note we're checking op specific state, not stream
955     state) */
956     if (op_state->state_op_done[OP_ON_COMPLETE]) {
957       CRONET_LOG(GPR_DEBUG, "Because");
958       result = false;
959     }
960     /* Check if every op that was asked for is done. */
961     /* TODO(muxi): We should not consider the recv ops here, since they
962      * have their own callbacks.  We should invoke a batch's on_complete
963      * as soon as all of the batch's send ops are complete, even if
964      * there are still recv ops pending. */
965     else if (curr_op->send_initial_metadata &&
966              !stream_state->state_callback_received[OP_SEND_INITIAL_METADATA]) {
967       CRONET_LOG(GPR_DEBUG, "Because");
968       result = false;
969     } else if (curr_op->send_message &&
970                !op_state->state_op_done[OP_SEND_MESSAGE]) {
971       CRONET_LOG(GPR_DEBUG, "Because");
972       result = false;
973     } else if (curr_op->send_message &&
974                !stream_state->state_callback_received[OP_SEND_MESSAGE]) {
975       CRONET_LOG(GPR_DEBUG, "Because");
976       result = false;
977     } else if (curr_op->send_trailing_metadata &&
978                !stream_state->state_op_done[OP_SEND_TRAILING_METADATA]) {
979       CRONET_LOG(GPR_DEBUG, "Because");
980       result = false;
981     } else if (curr_op->recv_initial_metadata &&
982                !stream_state->state_op_done[OP_RECV_INITIAL_METADATA]) {
983       CRONET_LOG(GPR_DEBUG, "Because");
984       result = false;
985     } else if (curr_op->recv_message &&
986                !op_state->state_op_done[OP_RECV_MESSAGE]) {
987       CRONET_LOG(GPR_DEBUG, "Because");
988       result = false;
989     } else if (curr_op->cancel_stream &&
990                !stream_state->state_callback_received[OP_CANCELED]) {
991       CRONET_LOG(GPR_DEBUG, "Because");
992       result = false;
993     } else if (curr_op->recv_trailing_metadata) {
994       /* We aren't done with trailing metadata yet */
995       if (!stream_state->state_op_done[OP_RECV_TRAILING_METADATA]) {
996         CRONET_LOG(GPR_DEBUG, "Because");
997         result = false;
998       }
999       /* We've asked for actual message in an earlier op, and it hasn't been
1000         delivered yet. */
1001       else if (stream_state->state_op_done[OP_READ_REQ_MADE]) {
1002         /* If this op is not the one asking for read, (which means some earlier
1003           op has asked), and the read hasn't been delivered. */
1004         if (!curr_op->recv_message &&
1005             !stream_state->state_callback_received[OP_SUCCEEDED]) {
1006           CRONET_LOG(GPR_DEBUG, "Because");
1007           result = false;
1008         }
1009       }
1010     }
1011     /* We should see at least one on_write_completed for the trailers that we
1012       sent */
1013     else if (curr_op->send_trailing_metadata &&
1014              !stream_state->state_callback_received[OP_SEND_MESSAGE])
1015       result = false;
1016   }
1017   CRONET_LOG(GPR_DEBUG, "op_can_be_run %s : %s", op_id_string(op_id),
1018              result ? "YES" : "NO");
1019   return result;
1020 }
1021
1022 /*
1023   TODO (makdharma): Break down this function in smaller chunks for readability.
1024 */
1025 static enum e_op_result execute_stream_op(struct op_and_state* oas) {
1026   grpc_transport_stream_op_batch* stream_op = &oas->op;
1027   struct stream_obj* s = oas->s;
1028   grpc_cronet_transport* t = s->curr_ct;
1029   struct op_state* stream_state = &s->state;
1030   enum e_op_result result = NO_ACTION_POSSIBLE;
1031   if (stream_op->send_initial_metadata &&
1032       op_can_be_run(stream_op, s, &oas->state, OP_SEND_INITIAL_METADATA)) {
1033     CRONET_LOG(GPR_DEBUG, "running: %p OP_SEND_INITIAL_METADATA", oas);
1034     /* Start new cronet stream. It is destroyed in on_succeeded, on_canceled,
1035      * on_failed */
1036     GPR_ASSERT(s->cbs == nullptr);
1037     GPR_ASSERT(!stream_state->state_op_done[OP_SEND_INITIAL_METADATA]);
1038     s->cbs =
1039         bidirectional_stream_create(t->engine, s->curr_gs, &cronet_callbacks);
1040     CRONET_LOG(GPR_DEBUG, "%p = bidirectional_stream_create()", s->cbs);
1041     if (t->use_packet_coalescing) {
1042       bidirectional_stream_disable_auto_flush(s->cbs, true);
1043       bidirectional_stream_delay_request_headers_until_flush(s->cbs, true);
1044     }
1045     char* url = nullptr;
1046     const char* method = "POST";
1047     s->header_array.headers = nullptr;
1048     convert_metadata_to_cronet_headers(
1049         stream_op->payload->send_initial_metadata.send_initial_metadata,
1050         t->host, &url, &s->header_array.headers, &s->header_array.count,
1051         &method);
1052     s->header_array.capacity = s->header_array.count;
1053     CRONET_LOG(GPR_DEBUG, "bidirectional_stream_start(%p, %s)", s->cbs, url);
1054     bidirectional_stream_start(s->cbs, url, 0, method, &s->header_array, false);
1055     if (url) {
1056       gpr_free(url);
1057     }
1058     unsigned int header_index;
1059     for (header_index = 0; header_index < s->header_array.count;
1060          header_index++) {
1061       gpr_free((void*)s->header_array.headers[header_index].key);
1062       gpr_free((void*)s->header_array.headers[header_index].value);
1063     }
1064     stream_state->state_op_done[OP_SEND_INITIAL_METADATA] = true;
1065     if (t->use_packet_coalescing) {
1066       if (!stream_op->send_message && !stream_op->send_trailing_metadata) {
1067         s->state.flush_cronet_when_ready = true;
1068       }
1069     }
1070     result = ACTION_TAKEN_WITH_CALLBACK;
1071   } else if (stream_op->send_message &&
1072              op_can_be_run(stream_op, s, &oas->state, OP_SEND_MESSAGE)) {
1073     CRONET_LOG(GPR_DEBUG, "running: %p  OP_SEND_MESSAGE", oas);
1074     stream_state->pending_send_message = false;
1075     if (stream_state->state_callback_received[OP_FAILED]) {
1076       result = NO_ACTION_POSSIBLE;
1077       CRONET_LOG(GPR_DEBUG, "Stream is either cancelled or failed.");
1078     } else {
1079       grpc_slice_buffer write_slice_buffer;
1080       grpc_slice slice;
1081       grpc_slice_buffer_init(&write_slice_buffer);
1082       if (1 != stream_op->payload->send_message.send_message->Next(
1083                    stream_op->payload->send_message.send_message->length(),
1084                    nullptr)) {
1085         /* Should never reach here */
1086         GPR_ASSERT(false);
1087       }
1088       if (GRPC_ERROR_NONE !=
1089           stream_op->payload->send_message.send_message->Pull(&slice)) {
1090         /* Should never reach here */
1091         GPR_ASSERT(false);
1092       }
1093       grpc_slice_buffer_add(&write_slice_buffer, slice);
1094       if (GPR_UNLIKELY(write_slice_buffer.count != 1)) {
1095         /* Empty request not handled yet */
1096         gpr_log(GPR_ERROR, "Empty request is not supported");
1097         GPR_ASSERT(write_slice_buffer.count == 1);
1098       }
1099       if (write_slice_buffer.count > 0) {
1100         size_t write_buffer_size;
1101         create_grpc_frame(
1102             &write_slice_buffer, &stream_state->ws.write_buffer,
1103             &write_buffer_size,
1104             stream_op->payload->send_message.send_message->flags());
1105         CRONET_LOG(GPR_DEBUG, "bidirectional_stream_write (%p, %p)", s->cbs,
1106                    stream_state->ws.write_buffer);
1107         stream_state->state_callback_received[OP_SEND_MESSAGE] = false;
1108         bidirectional_stream_write(s->cbs, stream_state->ws.write_buffer,
1109                                    static_cast<int>(write_buffer_size), false);
1110         grpc_slice_buffer_destroy_internal(&write_slice_buffer);
1111         if (t->use_packet_coalescing) {
1112           if (!stream_op->send_trailing_metadata) {
1113             CRONET_LOG(GPR_DEBUG, "bidirectional_stream_flush (%p)", s->cbs);
1114             bidirectional_stream_flush(s->cbs);
1115             result = ACTION_TAKEN_WITH_CALLBACK;
1116           } else {
1117             stream_state->pending_write_for_trailer = true;
1118             result = ACTION_TAKEN_NO_CALLBACK;
1119           }
1120         } else {
1121           result = ACTION_TAKEN_WITH_CALLBACK;
1122         }
1123       } else {
1124         result = NO_ACTION_POSSIBLE;
1125       }
1126     }
1127     stream_state->state_op_done[OP_SEND_MESSAGE] = true;
1128     oas->state.state_op_done[OP_SEND_MESSAGE] = true;
1129     stream_op->payload->send_message.send_message.reset();
1130   } else if (stream_op->send_trailing_metadata &&
1131              op_can_be_run(stream_op, s, &oas->state,
1132                            OP_SEND_TRAILING_METADATA)) {
1133     CRONET_LOG(GPR_DEBUG, "running: %p  OP_SEND_TRAILING_METADATA", oas);
1134     if (stream_state->state_callback_received[OP_FAILED]) {
1135       result = NO_ACTION_POSSIBLE;
1136       CRONET_LOG(GPR_DEBUG, "Stream is either cancelled or failed.");
1137     } else {
1138       CRONET_LOG(GPR_DEBUG, "bidirectional_stream_write (%p, 0)", s->cbs);
1139       stream_state->state_callback_received[OP_SEND_MESSAGE] = false;
1140       bidirectional_stream_write(s->cbs, "", 0, true);
1141       if (t->use_packet_coalescing) {
1142         CRONET_LOG(GPR_DEBUG, "bidirectional_stream_flush (%p)", s->cbs);
1143         bidirectional_stream_flush(s->cbs);
1144       }
1145       result = ACTION_TAKEN_WITH_CALLBACK;
1146     }
1147     stream_state->state_op_done[OP_SEND_TRAILING_METADATA] = true;
1148   } else if (stream_op->recv_initial_metadata &&
1149              op_can_be_run(stream_op, s, &oas->state,
1150                            OP_RECV_INITIAL_METADATA)) {
1151     CRONET_LOG(GPR_DEBUG, "running: %p  OP_RECV_INITIAL_METADATA", oas);
1152     if (stream_state->state_op_done[OP_CANCEL_ERROR]) {
1153       GRPC_CLOSURE_SCHED(
1154           stream_op->payload->recv_initial_metadata.recv_initial_metadata_ready,
1155           GRPC_ERROR_NONE);
1156     } else if (stream_state->state_callback_received[OP_FAILED]) {
1157       GRPC_CLOSURE_SCHED(
1158           stream_op->payload->recv_initial_metadata.recv_initial_metadata_ready,
1159           GRPC_ERROR_NONE);
1160     } else if (stream_state->state_op_done[OP_RECV_TRAILING_METADATA]) {
1161       GRPC_CLOSURE_SCHED(
1162           stream_op->payload->recv_initial_metadata.recv_initial_metadata_ready,
1163           GRPC_ERROR_NONE);
1164     } else {
1165       grpc_chttp2_incoming_metadata_buffer_publish(
1166           &oas->s->state.rs.initial_metadata,
1167           stream_op->payload->recv_initial_metadata.recv_initial_metadata);
1168       GRPC_CLOSURE_SCHED(
1169           stream_op->payload->recv_initial_metadata.recv_initial_metadata_ready,
1170           GRPC_ERROR_NONE);
1171     }
1172     stream_state->state_op_done[OP_RECV_INITIAL_METADATA] = true;
1173     result = ACTION_TAKEN_NO_CALLBACK;
1174   } else if (stream_op->recv_message &&
1175              op_can_be_run(stream_op, s, &oas->state, OP_RECV_MESSAGE)) {
1176     CRONET_LOG(GPR_DEBUG, "running: %p  OP_RECV_MESSAGE", oas);
1177     if (stream_state->state_op_done[OP_CANCEL_ERROR]) {
1178       CRONET_LOG(GPR_DEBUG, "Stream is cancelled.");
1179       GRPC_CLOSURE_SCHED(stream_op->payload->recv_message.recv_message_ready,
1180                          GRPC_ERROR_NONE);
1181       stream_state->state_op_done[OP_RECV_MESSAGE] = true;
1182       oas->state.state_op_done[OP_RECV_MESSAGE] = true;
1183       result = ACTION_TAKEN_NO_CALLBACK;
1184     } else if (stream_state->state_callback_received[OP_FAILED]) {
1185       CRONET_LOG(GPR_DEBUG, "Stream failed.");
1186       GRPC_CLOSURE_SCHED(stream_op->payload->recv_message.recv_message_ready,
1187                          GRPC_ERROR_NONE);
1188       stream_state->state_op_done[OP_RECV_MESSAGE] = true;
1189       oas->state.state_op_done[OP_RECV_MESSAGE] = true;
1190       result = ACTION_TAKEN_NO_CALLBACK;
1191     } else if (stream_state->rs.read_stream_closed == true) {
1192       /* No more data will be received */
1193       CRONET_LOG(GPR_DEBUG, "read stream closed");
1194       GRPC_CLOSURE_SCHED(stream_op->payload->recv_message.recv_message_ready,
1195                          GRPC_ERROR_NONE);
1196       stream_state->state_op_done[OP_RECV_MESSAGE] = true;
1197       oas->state.state_op_done[OP_RECV_MESSAGE] = true;
1198       result = ACTION_TAKEN_NO_CALLBACK;
1199     } else if (stream_state->flush_read) {
1200       CRONET_LOG(GPR_DEBUG, "flush read");
1201       GRPC_CLOSURE_SCHED(stream_op->payload->recv_message.recv_message_ready,
1202                          GRPC_ERROR_NONE);
1203       stream_state->state_op_done[OP_RECV_MESSAGE] = true;
1204       oas->state.state_op_done[OP_RECV_MESSAGE] = true;
1205       result = ACTION_TAKEN_NO_CALLBACK;
1206     } else if (stream_state->rs.length_field_received == false) {
1207       if (stream_state->rs.received_bytes == GRPC_HEADER_SIZE_IN_BYTES &&
1208           stream_state->rs.remaining_bytes == 0) {
1209         /* Start a read operation for data */
1210         stream_state->rs.length_field_received = true;
1211         parse_grpc_header(
1212             reinterpret_cast<const uint8_t*>(stream_state->rs.read_buffer),
1213             &stream_state->rs.length_field, &stream_state->rs.compressed);
1214         CRONET_LOG(GPR_DEBUG, "length field = %d",
1215                    stream_state->rs.length_field);
1216         if (stream_state->rs.length_field > 0) {
1217           stream_state->rs.read_buffer = static_cast<char*>(
1218               gpr_malloc(static_cast<size_t>(stream_state->rs.length_field)));
1219           GPR_ASSERT(stream_state->rs.read_buffer);
1220           stream_state->rs.remaining_bytes = stream_state->rs.length_field;
1221           stream_state->rs.received_bytes = 0;
1222           CRONET_LOG(GPR_DEBUG, "bidirectional_stream_read(%p)", s->cbs);
1223           stream_state->state_op_done[OP_READ_REQ_MADE] =
1224               true; /* Indicates that at least one read request has been made */
1225           bidirectional_stream_read(s->cbs, stream_state->rs.read_buffer,
1226                                     stream_state->rs.remaining_bytes);
1227           stream_state->pending_read_from_cronet = true;
1228           result = ACTION_TAKEN_WITH_CALLBACK;
1229         } else {
1230           stream_state->rs.remaining_bytes = 0;
1231           CRONET_LOG(GPR_DEBUG, "read operation complete. Empty response.");
1232           /* Clean up read_slice_buffer in case there is unread data. */
1233           grpc_slice_buffer_destroy_internal(
1234               &stream_state->rs.read_slice_buffer);
1235           grpc_slice_buffer_init(&stream_state->rs.read_slice_buffer);
1236           uint32_t flags = 0;
1237           if (stream_state->rs.compressed) {
1238             flags |= GRPC_WRITE_INTERNAL_COMPRESS;
1239           }
1240           stream_state->rs.sbs.Init(&stream_state->rs.read_slice_buffer, flags);
1241           stream_op->payload->recv_message.recv_message->reset(
1242               stream_state->rs.sbs.get());
1243           GRPC_CLOSURE_SCHED(
1244               stream_op->payload->recv_message.recv_message_ready,
1245               GRPC_ERROR_NONE);
1246           stream_state->state_op_done[OP_RECV_MESSAGE] = true;
1247           oas->state.state_op_done[OP_RECV_MESSAGE] = true;
1248
1249           /* Extra read to trigger on_succeed */
1250           stream_state->rs.read_buffer = stream_state->rs.grpc_header_bytes;
1251           stream_state->rs.remaining_bytes = GRPC_HEADER_SIZE_IN_BYTES;
1252           stream_state->rs.received_bytes = 0;
1253           stream_state->rs.compressed = false;
1254           stream_state->rs.length_field_received = false;
1255           CRONET_LOG(GPR_DEBUG, "bidirectional_stream_read(%p)", s->cbs);
1256           stream_state->state_op_done[OP_READ_REQ_MADE] =
1257               true; /* Indicates that at least one read request has been made */
1258           bidirectional_stream_read(s->cbs, stream_state->rs.read_buffer,
1259                                     stream_state->rs.remaining_bytes);
1260           stream_state->pending_read_from_cronet = true;
1261           result = ACTION_TAKEN_NO_CALLBACK;
1262         }
1263       } else if (stream_state->rs.remaining_bytes == 0) {
1264         /* Start a read operation for first 5 bytes (GRPC header) */
1265         stream_state->rs.read_buffer = stream_state->rs.grpc_header_bytes;
1266         stream_state->rs.remaining_bytes = GRPC_HEADER_SIZE_IN_BYTES;
1267         stream_state->rs.received_bytes = 0;
1268         stream_state->rs.compressed = false;
1269         CRONET_LOG(GPR_DEBUG, "bidirectional_stream_read(%p)", s->cbs);
1270         stream_state->state_op_done[OP_READ_REQ_MADE] =
1271             true; /* Indicates that at least one read request has been made */
1272         bidirectional_stream_read(s->cbs, stream_state->rs.read_buffer,
1273                                   stream_state->rs.remaining_bytes);
1274         stream_state->pending_read_from_cronet = true;
1275         result = ACTION_TAKEN_WITH_CALLBACK;
1276       } else {
1277         result = NO_ACTION_POSSIBLE;
1278       }
1279     } else if (stream_state->rs.remaining_bytes == 0) {
1280       CRONET_LOG(GPR_DEBUG, "read operation complete");
1281       grpc_slice read_data_slice =
1282           GRPC_SLICE_MALLOC((uint32_t)stream_state->rs.length_field);
1283       uint8_t* dst_p = GRPC_SLICE_START_PTR(read_data_slice);
1284       memcpy(dst_p, stream_state->rs.read_buffer,
1285              static_cast<size_t>(stream_state->rs.length_field));
1286       null_and_maybe_free_read_buffer(s);
1287       /* Clean up read_slice_buffer in case there is unread data. */
1288       grpc_slice_buffer_destroy_internal(&stream_state->rs.read_slice_buffer);
1289       grpc_slice_buffer_init(&stream_state->rs.read_slice_buffer);
1290       grpc_slice_buffer_add(&stream_state->rs.read_slice_buffer,
1291                             read_data_slice);
1292       uint32_t flags = 0;
1293       if (stream_state->rs.compressed) {
1294         flags = GRPC_WRITE_INTERNAL_COMPRESS;
1295       }
1296       stream_state->rs.sbs.Init(&stream_state->rs.read_slice_buffer, flags);
1297       stream_op->payload->recv_message.recv_message->reset(
1298           stream_state->rs.sbs.get());
1299       GRPC_CLOSURE_SCHED(stream_op->payload->recv_message.recv_message_ready,
1300                          GRPC_ERROR_NONE);
1301       stream_state->state_op_done[OP_RECV_MESSAGE] = true;
1302       oas->state.state_op_done[OP_RECV_MESSAGE] = true;
1303       /* Do an extra read to trigger on_succeeded() callback in case connection
1304          is closed */
1305       stream_state->rs.read_buffer = stream_state->rs.grpc_header_bytes;
1306       stream_state->rs.compressed = false;
1307       stream_state->rs.received_bytes = 0;
1308       stream_state->rs.remaining_bytes = GRPC_HEADER_SIZE_IN_BYTES;
1309       stream_state->rs.length_field_received = false;
1310       CRONET_LOG(GPR_DEBUG, "bidirectional_stream_read(%p)", s->cbs);
1311       bidirectional_stream_read(s->cbs, stream_state->rs.read_buffer,
1312                                 stream_state->rs.remaining_bytes);
1313       stream_state->pending_read_from_cronet = true;
1314       result = ACTION_TAKEN_NO_CALLBACK;
1315     }
1316   } else if (stream_op->recv_trailing_metadata &&
1317              op_can_be_run(stream_op, s, &oas->state,
1318                            OP_RECV_TRAILING_METADATA)) {
1319     CRONET_LOG(GPR_DEBUG, "running: %p  OP_RECV_TRAILING_METADATA", oas);
1320     grpc_error* error = GRPC_ERROR_NONE;
1321     if (stream_state->state_op_done[OP_CANCEL_ERROR]) {
1322       error = GRPC_ERROR_REF(stream_state->cancel_error);
1323     } else if (stream_state->state_callback_received[OP_FAILED]) {
1324       error = make_error_with_desc(GRPC_STATUS_UNAVAILABLE, "Unavailable.");
1325     } else if (oas->s->state.rs.trailing_metadata_valid) {
1326       grpc_chttp2_incoming_metadata_buffer_publish(
1327           &oas->s->state.rs.trailing_metadata,
1328           stream_op->payload->recv_trailing_metadata.recv_trailing_metadata);
1329       stream_state->rs.trailing_metadata_valid = false;
1330     }
1331     GRPC_CLOSURE_SCHED(
1332         stream_op->payload->recv_trailing_metadata.recv_trailing_metadata_ready,
1333         error);
1334     stream_state->state_op_done[OP_RECV_TRAILING_METADATA] = true;
1335     result = ACTION_TAKEN_NO_CALLBACK;
1336   } else if (stream_op->cancel_stream &&
1337              op_can_be_run(stream_op, s, &oas->state, OP_CANCEL_ERROR)) {
1338     CRONET_LOG(GPR_DEBUG, "running: %p  OP_CANCEL_ERROR", oas);
1339     if (s->cbs) {
1340       CRONET_LOG(GPR_DEBUG, "W: bidirectional_stream_cancel(%p)", s->cbs);
1341       bidirectional_stream_cancel(s->cbs);
1342       result = ACTION_TAKEN_WITH_CALLBACK;
1343     } else {
1344       result = ACTION_TAKEN_NO_CALLBACK;
1345     }
1346     stream_state->state_op_done[OP_CANCEL_ERROR] = true;
1347     if (!stream_state->cancel_error) {
1348       stream_state->cancel_error =
1349           GRPC_ERROR_REF(stream_op->payload->cancel_stream.cancel_error);
1350     }
1351   } else if (stream_op->on_complete &&
1352              op_can_be_run(stream_op, s, &oas->state, OP_ON_COMPLETE)) {
1353     CRONET_LOG(GPR_DEBUG, "running: %p  OP_ON_COMPLETE", oas);
1354     if (stream_state->state_op_done[OP_CANCEL_ERROR]) {
1355       GRPC_CLOSURE_SCHED(stream_op->on_complete,
1356                          GRPC_ERROR_REF(stream_state->cancel_error));
1357     } else if (stream_state->state_callback_received[OP_FAILED]) {
1358       GRPC_CLOSURE_SCHED(
1359           stream_op->on_complete,
1360           make_error_with_desc(GRPC_STATUS_UNAVAILABLE, "Unavailable."));
1361     } else {
1362       /* All actions in this stream_op are complete. Call the on_complete
1363        * callback
1364        */
1365       GRPC_CLOSURE_SCHED(stream_op->on_complete, GRPC_ERROR_NONE);
1366     }
1367     oas->state.state_op_done[OP_ON_COMPLETE] = true;
1368     oas->done = true;
1369     /* reset any send message state, only if this ON_COMPLETE is about a send.
1370      */
1371     if (stream_op->send_message) {
1372       stream_state->state_callback_received[OP_SEND_MESSAGE] = false;
1373       stream_state->state_op_done[OP_SEND_MESSAGE] = false;
1374     }
1375     result = ACTION_TAKEN_NO_CALLBACK;
1376     /* If this is the on_complete callback being called for a received message -
1377       make a note */
1378     if (stream_op->recv_message)
1379       stream_state->state_op_done[OP_RECV_MESSAGE_AND_ON_COMPLETE] = true;
1380   } else {
1381     result = NO_ACTION_POSSIBLE;
1382   }
1383   return result;
1384 }
1385
1386 /*
1387   Functions used by upper layers to access transport functionality.
1388 */
1389
1390 inline stream_obj::stream_obj(grpc_transport* gt, grpc_stream* gs,
1391                               grpc_stream_refcount* refcount,
1392                               grpc_core::Arena* arena)
1393     : arena(arena),
1394       curr_ct(reinterpret_cast<grpc_cronet_transport*>(gt)),
1395       curr_gs(gs),
1396       state(arena),
1397       refcount(refcount) {
1398   GRPC_CRONET_STREAM_REF(this, "cronet transport");
1399   gpr_mu_init(&mu);
1400 }
1401
1402 inline stream_obj::~stream_obj() {
1403   null_and_maybe_free_read_buffer(this);
1404   /* Clean up read_slice_buffer in case there is unread data. */
1405   grpc_slice_buffer_destroy_internal(&state.rs.read_slice_buffer);
1406   GRPC_ERROR_UNREF(state.cancel_error);
1407 }
1408
1409 static int init_stream(grpc_transport* gt, grpc_stream* gs,
1410                        grpc_stream_refcount* refcount, const void* server_data,
1411                        grpc_core::Arena* arena) {
1412   new (gs) stream_obj(gt, gs, refcount, arena);
1413   return 0;
1414 }
1415
1416 static void set_pollset_do_nothing(grpc_transport* gt, grpc_stream* gs,
1417                                    grpc_pollset* pollset) {}
1418
1419 static void set_pollset_set_do_nothing(grpc_transport* gt, grpc_stream* gs,
1420                                        grpc_pollset_set* pollset_set) {}
1421
1422 static void perform_stream_op(grpc_transport* gt, grpc_stream* gs,
1423                               grpc_transport_stream_op_batch* op) {
1424   CRONET_LOG(GPR_DEBUG, "perform_stream_op");
1425   if (op->send_initial_metadata &&
1426       header_has_authority(op->payload->send_initial_metadata
1427                                .send_initial_metadata->list.head)) {
1428     /* Cronet does not support :authority header field. We cancel the call when
1429      this field is present in metadata */
1430     if (op->recv_initial_metadata) {
1431       GRPC_CLOSURE_SCHED(
1432           op->payload->recv_initial_metadata.recv_initial_metadata_ready,
1433           GRPC_ERROR_CANCELLED);
1434     }
1435     if (op->recv_message) {
1436       GRPC_CLOSURE_SCHED(op->payload->recv_message.recv_message_ready,
1437                          GRPC_ERROR_CANCELLED);
1438     }
1439     if (op->recv_trailing_metadata) {
1440       GRPC_CLOSURE_SCHED(
1441           op->payload->recv_trailing_metadata.recv_trailing_metadata_ready,
1442           GRPC_ERROR_CANCELLED);
1443     }
1444     GRPC_CLOSURE_SCHED(op->on_complete, GRPC_ERROR_CANCELLED);
1445     return;
1446   }
1447   stream_obj* s = reinterpret_cast<stream_obj*>(gs);
1448   add_to_storage(s, op);
1449   execute_from_storage(s);
1450 }
1451
1452 static void destroy_stream(grpc_transport* gt, grpc_stream* gs,
1453                            grpc_closure* then_schedule_closure) {
1454   stream_obj* s = reinterpret_cast<stream_obj*>(gs);
1455   s->~stream_obj();
1456   GRPC_CLOSURE_SCHED(then_schedule_closure, GRPC_ERROR_NONE);
1457 }
1458
1459 static void destroy_transport(grpc_transport* gt) {}
1460
1461 static grpc_endpoint* get_endpoint(grpc_transport* gt) { return nullptr; }
1462
1463 static void perform_op(grpc_transport* gt, grpc_transport_op* op) {}
1464
1465 static const grpc_transport_vtable grpc_cronet_vtable = {
1466     sizeof(stream_obj),
1467     "cronet_http",
1468     init_stream,
1469     set_pollset_do_nothing,
1470     set_pollset_set_do_nothing,
1471     perform_stream_op,
1472     perform_op,
1473     destroy_stream,
1474     destroy_transport,
1475     get_endpoint};
1476
1477 grpc_transport* grpc_create_cronet_transport(void* engine, const char* target,
1478                                              const grpc_channel_args* args,
1479                                              void* reserved) {
1480   grpc_cronet_transport* ct = static_cast<grpc_cronet_transport*>(
1481       gpr_malloc(sizeof(grpc_cronet_transport)));
1482   if (!ct) {
1483     goto error;
1484   }
1485   ct->base.vtable = &grpc_cronet_vtable;
1486   ct->engine = static_cast<stream_engine*>(engine);
1487   ct->host = static_cast<char*>(gpr_malloc(strlen(target) + 1));
1488   if (!ct->host) {
1489     goto error;
1490   }
1491   strcpy(ct->host, target);
1492
1493   ct->use_packet_coalescing = true;
1494   if (args) {
1495     for (size_t i = 0; i < args->num_args; i++) {
1496       if (0 ==
1497           strcmp(args->args[i].key, GRPC_ARG_USE_CRONET_PACKET_COALESCING)) {
1498         if (GPR_UNLIKELY(args->args[i].type != GRPC_ARG_INTEGER)) {
1499           gpr_log(GPR_ERROR, "%s ignored: it must be an integer",
1500                   GRPC_ARG_USE_CRONET_PACKET_COALESCING);
1501         } else {
1502           ct->use_packet_coalescing = (args->args[i].value.integer != 0);
1503         }
1504       }
1505     }
1506   }
1507
1508   return &ct->base;
1509
1510 error:
1511   if (ct) {
1512     if (ct->host) {
1513       gpr_free(ct->host);
1514     }
1515     gpr_free(ct);
1516   }
1517
1518   return nullptr;
1519 }