Built motion from commit 6a09e18b.|2.6.11
[motion2.git] / legacy-libs / grpc / deps / grpc / src / core / lib / surface / call.cc
1 /*
2  *
3  * Copyright 2015 gRPC authors.
4  *
5  * Licensed under the Apache License, Version 2.0 (the "License");
6  * you may not use this file except in compliance with the License.
7  * You may obtain a copy of the License at
8  *
9  *     http://www.apache.org/licenses/LICENSE-2.0
10  *
11  * Unless required by applicable law or agreed to in writing, software
12  * distributed under the License is distributed on an "AS IS" BASIS,
13  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14  * See the License for the specific language governing permissions and
15  * limitations under the License.
16  *
17  */
18
19 #include <grpc/support/port_platform.h>
20
21 #include <assert.h>
22 #include <limits.h>
23 #include <stdio.h>
24 #include <stdlib.h>
25 #include <string.h>
26
27 #include <grpc/compression.h>
28 #include <grpc/grpc.h>
29 #include <grpc/slice.h>
30 #include <grpc/support/alloc.h>
31 #include <grpc/support/log.h>
32 #include <grpc/support/string_util.h>
33
34 #include "src/core/lib/channel/channel_stack.h"
35 #include "src/core/lib/compression/algorithm_metadata.h"
36 #include "src/core/lib/debug/stats.h"
37 #include "src/core/lib/gpr/alloc.h"
38 #include "src/core/lib/gpr/string.h"
39 #include "src/core/lib/gpr/time_precise.h"
40 #include "src/core/lib/gpr/useful.h"
41 #include "src/core/lib/gprpp/arena.h"
42 #include "src/core/lib/gprpp/manual_constructor.h"
43 #include "src/core/lib/gprpp/ref_counted.h"
44 #include "src/core/lib/iomgr/timer.h"
45 #include "src/core/lib/profiling/timers.h"
46 #include "src/core/lib/slice/slice_string_helpers.h"
47 #include "src/core/lib/slice/slice_utils.h"
48 #include "src/core/lib/surface/api_trace.h"
49 #include "src/core/lib/surface/call.h"
50 #include "src/core/lib/surface/call_test_only.h"
51 #include "src/core/lib/surface/channel.h"
52 #include "src/core/lib/surface/completion_queue.h"
53 #include "src/core/lib/surface/server.h"
54 #include "src/core/lib/surface/validate_metadata.h"
55 #include "src/core/lib/transport/error_utils.h"
56 #include "src/core/lib/transport/metadata.h"
57 #include "src/core/lib/transport/static_metadata.h"
58 #include "src/core/lib/transport/status_metadata.h"
59 #include "src/core/lib/transport/transport.h"
60
61 /** The maximum number of concurrent batches possible.
62     Based upon the maximum number of individually queueable ops in the batch
63     api:
64       - initial metadata send
65       - message send
66       - status/close send (depending on client/server)
67       - initial metadata recv
68       - message recv
69       - status/close recv (depending on client/server) */
70 #define MAX_CONCURRENT_BATCHES 6
71
72 #define MAX_SEND_EXTRA_METADATA_COUNT 3
73
74 // Used to create arena for the first call.
75 #define ESTIMATED_MDELEM_COUNT 16
76
77 struct batch_control {
78   batch_control() = default;
79
80   grpc_call* call = nullptr;
81   grpc_transport_stream_op_batch op;
82   /* Share memory for cq_completion and notify_tag as they are never needed
83      simultaneously. Each byte used in this data structure count as six bytes
84      per call, so any savings we can make are worthwhile,
85
86      We use notify_tag to determine whether or not to send notification to the
87      completion queue. Once we've made that determination, we can reuse the
88      memory for cq_completion. */
89   union {
90     grpc_cq_completion cq_completion;
91     struct {
92       /* Any given op indicates completion by either (a) calling a closure or
93          (b) sending a notification on the call's completion queue.  If
94          \a is_closure is true, \a tag indicates a closure to be invoked;
95          otherwise, \a tag indicates the tag to be used in the notification to
96          be sent to the completion queue. */
97       void* tag;
98       bool is_closure;
99     } notify_tag;
100   } completion_data;
101   grpc_closure start_batch;
102   grpc_closure finish_batch;
103   grpc_core::Atomic<intptr_t> steps_to_complete;
104   gpr_atm batch_error = reinterpret_cast<gpr_atm>(GRPC_ERROR_NONE);
105   void set_num_steps_to_complete(uintptr_t steps) {
106     steps_to_complete.Store(steps, grpc_core::MemoryOrder::RELEASE);
107   }
108   bool completed_batch_step() {
109     return steps_to_complete.FetchSub(1, grpc_core::MemoryOrder::ACQ_REL) == 1;
110   }
111 };
112
113 struct parent_call {
114   parent_call() { gpr_mu_init(&child_list_mu); }
115   ~parent_call() { gpr_mu_destroy(&child_list_mu); }
116
117   gpr_mu child_list_mu;
118   grpc_call* first_child = nullptr;
119 };
120
121 struct child_call {
122   child_call(grpc_call* parent) : parent(parent) {}
123   grpc_call* parent;
124   /** siblings: children of the same parent form a list, and this list is
125      protected under
126       parent->mu */
127   grpc_call* sibling_next = nullptr;
128   grpc_call* sibling_prev = nullptr;
129 };
130
131 #define RECV_NONE ((gpr_atm)0)
132 #define RECV_INITIAL_METADATA_FIRST ((gpr_atm)1)
133
134 struct grpc_call {
135   grpc_call(grpc_core::Arena* arena, const grpc_call_create_args& args)
136       : arena(arena),
137         cq(args.cq),
138         channel(args.channel),
139         is_client(args.server_transport_data == nullptr),
140         stream_op_payload(context) {
141     for (int i = 0; i < 2; i++) {
142       for (int j = 0; j < 2; j++) {
143         metadata_batch[i][j].deadline = GRPC_MILLIS_INF_FUTURE;
144       }
145     }
146   }
147
148   ~grpc_call() {
149     gpr_free(static_cast<void*>(const_cast<char*>(final_info.error_string)));
150   }
151
152   grpc_core::RefCount ext_ref;
153   grpc_core::Arena* arena;
154   grpc_core::CallCombiner call_combiner;
155   grpc_completion_queue* cq;
156   grpc_polling_entity pollent;
157   grpc_channel* channel;
158   gpr_cycle_counter start_time = gpr_get_cycle_counter();
159   /* parent_call* */ gpr_atm parent_call_atm = 0;
160   child_call* child = nullptr;
161
162   /* client or server call */
163   bool is_client;
164   /** has grpc_call_unref been called */
165   bool destroy_called = false;
166   /** flag indicating that cancellation is inherited */
167   bool cancellation_is_inherited = false;
168   /** which ops are in-flight */
169   bool sent_initial_metadata = false;
170   bool sending_message = false;
171   bool sent_final_op = false;
172   bool received_initial_metadata = false;
173   bool receiving_message = false;
174   bool requested_final_op = false;
175   gpr_atm any_ops_sent_atm = 0;
176   gpr_atm received_final_op_atm = 0;
177
178   batch_control* active_batches[MAX_CONCURRENT_BATCHES] = {};
179   grpc_transport_stream_op_batch_payload stream_op_payload;
180
181   /* first idx: is_receiving, second idx: is_trailing */
182   grpc_metadata_batch metadata_batch[2][2] = {};
183
184   /* Buffered read metadata waiting to be returned to the application.
185      Element 0 is initial metadata, element 1 is trailing metadata. */
186   grpc_metadata_array* buffered_metadata[2] = {};
187
188   grpc_metadata compression_md;
189
190   // A char* indicating the peer name.
191   gpr_atm peer_string = 0;
192
193   /* Call data useful used for reporting. Only valid after the call has
194    * completed */
195   grpc_call_final_info final_info;
196
197   /* Compression algorithm for *incoming* data */
198   grpc_message_compression_algorithm incoming_message_compression_algorithm =
199       GRPC_MESSAGE_COMPRESS_NONE;
200   /* Stream compression algorithm for *incoming* data */
201   grpc_stream_compression_algorithm incoming_stream_compression_algorithm =
202       GRPC_STREAM_COMPRESS_NONE;
203   /* Supported encodings (compression algorithms), a bitset.
204    * Always support no compression. */
205   uint32_t encodings_accepted_by_peer = 1 << GRPC_MESSAGE_COMPRESS_NONE;
206   /* Supported stream encodings (stream compression algorithms), a bitset */
207   uint32_t stream_encodings_accepted_by_peer = 0;
208
209   /* Contexts for various subsystems (security, tracing, ...). */
210   grpc_call_context_element context[GRPC_CONTEXT_COUNT] = {};
211
212   /* for the client, extra metadata is initial metadata; for the
213      server, it's trailing metadata */
214   grpc_linked_mdelem send_extra_metadata[MAX_SEND_EXTRA_METADATA_COUNT];
215   int send_extra_metadata_count;
216   grpc_millis send_deadline;
217
218   grpc_core::ManualConstructor<grpc_core::SliceBufferByteStream> sending_stream;
219
220   grpc_core::OrphanablePtr<grpc_core::ByteStream> receiving_stream;
221   grpc_byte_buffer** receiving_buffer = nullptr;
222   grpc_slice receiving_slice = grpc_empty_slice();
223   grpc_closure receiving_slice_ready;
224   grpc_closure receiving_stream_ready;
225   grpc_closure receiving_initial_metadata_ready;
226   grpc_closure receiving_trailing_metadata_ready;
227   uint32_t test_only_last_message_flags = 0;
228   gpr_atm cancelled = 0;
229
230   grpc_closure release_call;
231
232   union {
233     struct {
234       grpc_status_code* status;
235       grpc_slice* status_details;
236       const char** error_string;
237     } client;
238     struct {
239       int* cancelled;
240       // backpointer to owning server if this is a server side call.
241       grpc_server* server;
242     } server;
243   } final_op;
244   gpr_atm status_error = 0;
245
246   /* recv_state can contain one of the following values:
247      RECV_NONE :                 :  no initial metadata and messages received
248      RECV_INITIAL_METADATA_FIRST :  received initial metadata first
249      a batch_control*            :  received messages first
250
251                  +------1------RECV_NONE------3-----+
252                  |                                  |
253                  |                                  |
254                  v                                  v
255      RECV_INITIAL_METADATA_FIRST        receiving_stream_ready_bctlp
256            |           ^                      |           ^
257            |           |                      |           |
258            +-----2-----+                      +-----4-----+
259
260     For 1, 4: See receiving_initial_metadata_ready() function
261     For 2, 3: See receiving_stream_ready() function */
262   gpr_atm recv_state = 0;
263 };
264
265 grpc_core::TraceFlag grpc_call_error_trace(false, "call_error");
266 grpc_core::TraceFlag grpc_compression_trace(false, "compression");
267
268 #define CALL_STACK_FROM_CALL(call)   \
269   (grpc_call_stack*)((char*)(call) + \
270                      GPR_ROUND_UP_TO_ALIGNMENT_SIZE(sizeof(grpc_call)))
271 #define CALL_FROM_CALL_STACK(call_stack) \
272   (grpc_call*)(((char*)(call_stack)) -   \
273                GPR_ROUND_UP_TO_ALIGNMENT_SIZE(sizeof(grpc_call)))
274
275 #define CALL_ELEM_FROM_CALL(call, idx) \
276   grpc_call_stack_element(CALL_STACK_FROM_CALL(call), idx)
277 #define CALL_FROM_TOP_ELEM(top_elem) \
278   CALL_FROM_CALL_STACK(grpc_call_stack_from_top_element(top_elem))
279
280 static void execute_batch(grpc_call* call, grpc_transport_stream_op_batch* op,
281                           grpc_closure* start_batch_closure);
282
283 static void cancel_with_status(grpc_call* c, grpc_status_code status,
284                                const char* description);
285 static void cancel_with_error(grpc_call* c, grpc_error* error);
286 static void destroy_call(void* call_stack, grpc_error* error);
287 static void receiving_slice_ready(void* bctlp, grpc_error* error);
288 static void set_final_status(grpc_call* call, grpc_error* error);
289 static void process_data_after_md(batch_control* bctl);
290 static void post_batch_completion(batch_control* bctl);
291
292 static void add_init_error(grpc_error** composite, grpc_error* new_err) {
293   if (new_err == GRPC_ERROR_NONE) return;
294   if (*composite == GRPC_ERROR_NONE)
295     *composite = GRPC_ERROR_CREATE_FROM_STATIC_STRING("Call creation failed");
296   *composite = grpc_error_add_child(*composite, new_err);
297 }
298
299 void* grpc_call_arena_alloc(grpc_call* call, size_t size) {
300   return call->arena->Alloc(size);
301 }
302
303 static parent_call* get_or_create_parent_call(grpc_call* call) {
304   parent_call* p = (parent_call*)gpr_atm_acq_load(&call->parent_call_atm);
305   if (p == nullptr) {
306     p = call->arena->New<parent_call>();
307     if (!gpr_atm_rel_cas(&call->parent_call_atm, (gpr_atm) nullptr,
308                          (gpr_atm)p)) {
309       p->~parent_call();
310       p = (parent_call*)gpr_atm_acq_load(&call->parent_call_atm);
311     }
312   }
313   return p;
314 }
315
316 static parent_call* get_parent_call(grpc_call* call) {
317   return (parent_call*)gpr_atm_acq_load(&call->parent_call_atm);
318 }
319
320 size_t grpc_call_get_initial_size_estimate() {
321   return sizeof(grpc_call) + sizeof(batch_control) * MAX_CONCURRENT_BATCHES +
322          sizeof(grpc_linked_mdelem) * ESTIMATED_MDELEM_COUNT;
323 }
324
325 grpc_error* grpc_call_create(const grpc_call_create_args* args,
326                              grpc_call** out_call) {
327   GPR_TIMER_SCOPE("grpc_call_create", 0);
328
329   GRPC_CHANNEL_INTERNAL_REF(args->channel, "call");
330
331   grpc_core::Arena* arena;
332   grpc_call* call;
333   grpc_error* error = GRPC_ERROR_NONE;
334   grpc_channel_stack* channel_stack =
335       grpc_channel_get_channel_stack(args->channel);
336   size_t initial_size = grpc_channel_get_call_size_estimate(args->channel);
337   GRPC_STATS_INC_CALL_INITIAL_SIZE(initial_size);
338   size_t call_and_stack_size =
339       GPR_ROUND_UP_TO_ALIGNMENT_SIZE(sizeof(grpc_call)) +
340       channel_stack->call_stack_size;
341   size_t call_alloc_size =
342       call_and_stack_size + (args->parent ? sizeof(child_call) : 0);
343
344   std::pair<grpc_core::Arena*, void*> arena_with_call =
345       grpc_core::Arena::CreateWithAlloc(initial_size, call_alloc_size);
346   arena = arena_with_call.first;
347   call = new (arena_with_call.second) grpc_call(arena, *args);
348   *out_call = call;
349   grpc_slice path = grpc_empty_slice();
350   if (call->is_client) {
351     call->final_op.client.status_details = nullptr;
352     call->final_op.client.status = nullptr;
353     call->final_op.client.error_string = nullptr;
354     GRPC_STATS_INC_CLIENT_CALLS_CREATED();
355     GPR_ASSERT(args->add_initial_metadata_count <
356                MAX_SEND_EXTRA_METADATA_COUNT);
357     for (size_t i = 0; i < args->add_initial_metadata_count; i++) {
358       call->send_extra_metadata[i].md = args->add_initial_metadata[i];
359       if (grpc_slice_eq_static_interned(
360               GRPC_MDKEY(args->add_initial_metadata[i]), GRPC_MDSTR_PATH)) {
361         path = grpc_slice_ref_internal(
362             GRPC_MDVALUE(args->add_initial_metadata[i]));
363       }
364     }
365     call->send_extra_metadata_count =
366         static_cast<int>(args->add_initial_metadata_count);
367   } else {
368     GRPC_STATS_INC_SERVER_CALLS_CREATED();
369     call->final_op.server.cancelled = nullptr;
370     call->final_op.server.server = args->server;
371     GPR_ASSERT(args->add_initial_metadata_count == 0);
372     call->send_extra_metadata_count = 0;
373   }
374
375   grpc_millis send_deadline = args->send_deadline;
376   bool immediately_cancel = false;
377
378   if (args->parent != nullptr) {
379     call->child = new (reinterpret_cast<char*>(arena_with_call.second) +
380                        call_and_stack_size) child_call(args->parent);
381
382     GRPC_CALL_INTERNAL_REF(args->parent, "child");
383     GPR_ASSERT(call->is_client);
384     GPR_ASSERT(!args->parent->is_client);
385
386     if (args->propagation_mask & GRPC_PROPAGATE_DEADLINE) {
387       send_deadline = GPR_MIN(send_deadline, args->parent->send_deadline);
388     }
389     /* for now GRPC_PROPAGATE_TRACING_CONTEXT *MUST* be passed with
390      * GRPC_PROPAGATE_STATS_CONTEXT */
391     /* TODO(ctiller): This should change to use the appropriate census start_op
392      * call. */
393     if (args->propagation_mask & GRPC_PROPAGATE_CENSUS_TRACING_CONTEXT) {
394       if (0 == (args->propagation_mask & GRPC_PROPAGATE_CENSUS_STATS_CONTEXT)) {
395         add_init_error(&error, GRPC_ERROR_CREATE_FROM_STATIC_STRING(
396                                    "Census tracing propagation requested "
397                                    "without Census context propagation"));
398       }
399       grpc_call_context_set(call, GRPC_CONTEXT_TRACING,
400                             args->parent->context[GRPC_CONTEXT_TRACING].value,
401                             nullptr);
402     } else if (args->propagation_mask & GRPC_PROPAGATE_CENSUS_STATS_CONTEXT) {
403       add_init_error(&error, GRPC_ERROR_CREATE_FROM_STATIC_STRING(
404                                  "Census context propagation requested "
405                                  "without Census tracing propagation"));
406     }
407     if (args->propagation_mask & GRPC_PROPAGATE_CANCELLATION) {
408       call->cancellation_is_inherited = 1;
409       if (gpr_atm_acq_load(&args->parent->received_final_op_atm)) {
410         immediately_cancel = true;
411       }
412     }
413   }
414   call->send_deadline = send_deadline;
415   /* initial refcount dropped by grpc_call_unref */
416   grpc_call_element_args call_args = {CALL_STACK_FROM_CALL(call),
417                                       args->server_transport_data,
418                                       call->context,
419                                       path,
420                                       call->start_time,
421                                       send_deadline,
422                                       call->arena,
423                                       &call->call_combiner};
424   add_init_error(&error, grpc_call_stack_init(channel_stack, 1, destroy_call,
425                                               call, &call_args));
426   // Publish this call to parent only after the call stack has been initialized.
427   if (args->parent != nullptr) {
428     child_call* cc = call->child;
429     parent_call* pc = get_or_create_parent_call(args->parent);
430     gpr_mu_lock(&pc->child_list_mu);
431     if (pc->first_child == nullptr) {
432       pc->first_child = call;
433       cc->sibling_next = cc->sibling_prev = call;
434     } else {
435       cc->sibling_next = pc->first_child;
436       cc->sibling_prev = pc->first_child->child->sibling_prev;
437       cc->sibling_next->child->sibling_prev =
438           cc->sibling_prev->child->sibling_next = call;
439     }
440     gpr_mu_unlock(&pc->child_list_mu);
441   }
442
443   if (error != GRPC_ERROR_NONE) {
444     cancel_with_error(call, GRPC_ERROR_REF(error));
445   }
446   if (immediately_cancel) {
447     cancel_with_error(call, GRPC_ERROR_CANCELLED);
448   }
449   if (args->cq != nullptr) {
450     GPR_ASSERT(args->pollset_set_alternative == nullptr &&
451                "Only one of 'cq' and 'pollset_set_alternative' should be "
452                "non-nullptr.");
453     GRPC_CQ_INTERNAL_REF(args->cq, "bind");
454     call->pollent =
455         grpc_polling_entity_create_from_pollset(grpc_cq_pollset(args->cq));
456   }
457   if (args->pollset_set_alternative != nullptr) {
458     call->pollent = grpc_polling_entity_create_from_pollset_set(
459         args->pollset_set_alternative);
460   }
461   if (!grpc_polling_entity_is_empty(&call->pollent)) {
462     grpc_call_stack_set_pollset_or_pollset_set(CALL_STACK_FROM_CALL(call),
463                                                &call->pollent);
464   }
465
466   if (call->is_client) {
467     grpc_core::channelz::ChannelNode* channelz_channel =
468         grpc_channel_get_channelz_node(call->channel);
469     if (channelz_channel != nullptr) {
470       channelz_channel->RecordCallStarted();
471     }
472   } else {
473     grpc_core::channelz::ServerNode* channelz_server =
474         grpc_server_get_channelz_node(call->final_op.server.server);
475     if (channelz_server != nullptr) {
476       channelz_server->RecordCallStarted();
477     }
478   }
479
480   grpc_slice_unref_internal(path);
481
482   return error;
483 }
484
485 void grpc_call_set_completion_queue(grpc_call* call,
486                                     grpc_completion_queue* cq) {
487   GPR_ASSERT(cq);
488
489   if (grpc_polling_entity_pollset_set(&call->pollent) != nullptr) {
490     gpr_log(GPR_ERROR, "A pollset_set is already registered for this call.");
491     abort();
492   }
493   call->cq = cq;
494   GRPC_CQ_INTERNAL_REF(cq, "bind");
495   call->pollent = grpc_polling_entity_create_from_pollset(grpc_cq_pollset(cq));
496   grpc_call_stack_set_pollset_or_pollset_set(CALL_STACK_FROM_CALL(call),
497                                              &call->pollent);
498 }
499
500 #ifndef NDEBUG
501 #define REF_REASON reason
502 #define REF_ARG , const char* reason
503 #else
504 #define REF_REASON ""
505 #define REF_ARG
506 #endif
507 void grpc_call_internal_ref(grpc_call* c REF_ARG) {
508   GRPC_CALL_STACK_REF(CALL_STACK_FROM_CALL(c), REF_REASON);
509 }
510 void grpc_call_internal_unref(grpc_call* c REF_ARG) {
511   GRPC_CALL_STACK_UNREF(CALL_STACK_FROM_CALL(c), REF_REASON);
512 }
513
514 static void release_call(void* call, grpc_error* error) {
515   grpc_call* c = static_cast<grpc_call*>(call);
516   grpc_channel* channel = c->channel;
517   grpc_core::Arena* arena = c->arena;
518   c->~grpc_call();
519   grpc_channel_update_call_size_estimate(channel, arena->Destroy());
520   GRPC_CHANNEL_INTERNAL_UNREF(channel, "call");
521 }
522
523 static void destroy_call(void* call, grpc_error* error) {
524   GPR_TIMER_SCOPE("destroy_call", 0);
525   size_t i;
526   int ii;
527   grpc_call* c = static_cast<grpc_call*>(call);
528   for (i = 0; i < 2; i++) {
529     grpc_metadata_batch_destroy(
530         &c->metadata_batch[1 /* is_receiving */][i /* is_initial */]);
531   }
532   c->receiving_stream.reset();
533   parent_call* pc = get_parent_call(c);
534   if (pc != nullptr) {
535     pc->~parent_call();
536   }
537   for (ii = 0; ii < c->send_extra_metadata_count; ii++) {
538     GRPC_MDELEM_UNREF(c->send_extra_metadata[ii].md);
539   }
540   for (i = 0; i < GRPC_CONTEXT_COUNT; i++) {
541     if (c->context[i].destroy) {
542       c->context[i].destroy(c->context[i].value);
543     }
544   }
545   if (c->cq) {
546     GRPC_CQ_INTERNAL_UNREF(c->cq, "bind");
547   }
548
549   grpc_error* status_error =
550       reinterpret_cast<grpc_error*>(gpr_atm_acq_load(&c->status_error));
551   grpc_error_get_status(status_error, c->send_deadline,
552                         &c->final_info.final_status, nullptr, nullptr,
553                         &(c->final_info.error_string));
554   GRPC_ERROR_UNREF(status_error);
555   c->final_info.stats.latency =
556       gpr_cycle_counter_sub(gpr_get_cycle_counter(), c->start_time);
557   grpc_call_stack_destroy(CALL_STACK_FROM_CALL(c), &c->final_info,
558                           GRPC_CLOSURE_INIT(&c->release_call, release_call, c,
559                                             grpc_schedule_on_exec_ctx));
560 }
561
562 void grpc_call_ref(grpc_call* c) { c->ext_ref.Ref(); }
563
564 void grpc_call_unref(grpc_call* c) {
565   if (GPR_LIKELY(!c->ext_ref.Unref())) return;
566
567   GPR_TIMER_SCOPE("grpc_call_unref", 0);
568
569   child_call* cc = c->child;
570   grpc_core::ApplicationCallbackExecCtx callback_exec_ctx;
571   grpc_core::ExecCtx exec_ctx;
572
573   GRPC_API_TRACE("grpc_call_unref(c=%p)", 1, (c));
574
575   if (cc) {
576     parent_call* pc = get_parent_call(cc->parent);
577     gpr_mu_lock(&pc->child_list_mu);
578     if (c == pc->first_child) {
579       pc->first_child = cc->sibling_next;
580       if (c == pc->first_child) {
581         pc->first_child = nullptr;
582       }
583     }
584     cc->sibling_prev->child->sibling_next = cc->sibling_next;
585     cc->sibling_next->child->sibling_prev = cc->sibling_prev;
586     gpr_mu_unlock(&pc->child_list_mu);
587     GRPC_CALL_INTERNAL_UNREF(cc->parent, "child");
588   }
589
590   GPR_ASSERT(!c->destroy_called);
591   c->destroy_called = 1;
592   bool cancel = gpr_atm_acq_load(&c->any_ops_sent_atm) != 0 &&
593                 gpr_atm_acq_load(&c->received_final_op_atm) == 0;
594   if (cancel) {
595     cancel_with_error(c, GRPC_ERROR_CANCELLED);
596   } else {
597     // Unset the call combiner cancellation closure.  This has the
598     // effect of scheduling the previously set cancellation closure, if
599     // any, so that it can release any internal references it may be
600     // holding to the call stack. Also flush the closures on exec_ctx so that
601     // filters that schedule cancel notification closures on exec_ctx do not
602     // need to take a ref of the call stack to guarantee closure liveness.
603     c->call_combiner.SetNotifyOnCancel(nullptr);
604     grpc_core::ExecCtx::Get()->Flush();
605   }
606   GRPC_CALL_INTERNAL_UNREF(c, "destroy");
607 }
608
609 grpc_call_error grpc_call_cancel(grpc_call* call, void* reserved) {
610   GRPC_API_TRACE("grpc_call_cancel(call=%p, reserved=%p)", 2, (call, reserved));
611   GPR_ASSERT(!reserved);
612   grpc_core::ApplicationCallbackExecCtx callback_exec_ctx;
613   grpc_core::ExecCtx exec_ctx;
614   cancel_with_error(call, GRPC_ERROR_CANCELLED);
615   return GRPC_CALL_OK;
616 }
617
618 // This is called via the call combiner to start sending a batch down
619 // the filter stack.
620 static void execute_batch_in_call_combiner(void* arg, grpc_error* ignored) {
621   GPR_TIMER_SCOPE("execute_batch_in_call_combiner", 0);
622   grpc_transport_stream_op_batch* batch =
623       static_cast<grpc_transport_stream_op_batch*>(arg);
624   grpc_call* call = static_cast<grpc_call*>(batch->handler_private.extra_arg);
625   grpc_call_element* elem = CALL_ELEM_FROM_CALL(call, 0);
626   GRPC_CALL_LOG_OP(GPR_INFO, elem, batch);
627   elem->filter->start_transport_stream_op_batch(elem, batch);
628 }
629
630 // start_batch_closure points to a caller-allocated closure to be used
631 // for entering the call combiner.
632 static void execute_batch(grpc_call* call,
633                           grpc_transport_stream_op_batch* batch,
634                           grpc_closure* start_batch_closure) {
635   batch->handler_private.extra_arg = call;
636   GRPC_CLOSURE_INIT(start_batch_closure, execute_batch_in_call_combiner, batch,
637                     grpc_schedule_on_exec_ctx);
638   GRPC_CALL_COMBINER_START(&call->call_combiner, start_batch_closure,
639                            GRPC_ERROR_NONE, "executing batch");
640 }
641
642 char* grpc_call_get_peer(grpc_call* call) {
643   char* peer_string = (char*)gpr_atm_acq_load(&call->peer_string);
644   if (peer_string != nullptr) return gpr_strdup(peer_string);
645   peer_string = grpc_channel_get_target(call->channel);
646   if (peer_string != nullptr) return peer_string;
647   return gpr_strdup("unknown");
648 }
649
650 grpc_call* grpc_call_from_top_element(grpc_call_element* elem) {
651   return CALL_FROM_TOP_ELEM(elem);
652 }
653
654 /*******************************************************************************
655  * CANCELLATION
656  */
657
658 grpc_call_error grpc_call_cancel_with_status(grpc_call* c,
659                                              grpc_status_code status,
660                                              const char* description,
661                                              void* reserved) {
662   grpc_core::ApplicationCallbackExecCtx callback_exec_ctx;
663   grpc_core::ExecCtx exec_ctx;
664   GRPC_API_TRACE(
665       "grpc_call_cancel_with_status("
666       "c=%p, status=%d, description=%s, reserved=%p)",
667       4, (c, (int)status, description, reserved));
668   GPR_ASSERT(reserved == nullptr);
669   cancel_with_status(c, status, description);
670   return GRPC_CALL_OK;
671 }
672
673 typedef struct {
674   grpc_call* call;
675   grpc_closure start_batch;
676   grpc_closure finish_batch;
677 } cancel_state;
678
679 // The on_complete callback used when sending a cancel_stream batch down
680 // the filter stack.  Yields the call combiner when the batch is done.
681 static void done_termination(void* arg, grpc_error* error) {
682   cancel_state* state = static_cast<cancel_state*>(arg);
683   GRPC_CALL_COMBINER_STOP(&state->call->call_combiner,
684                           "on_complete for cancel_stream op");
685   GRPC_CALL_INTERNAL_UNREF(state->call, "termination");
686   gpr_free(state);
687 }
688
689 static void cancel_with_error(grpc_call* c, grpc_error* error) {
690   if (!gpr_atm_rel_cas(&c->cancelled, 0, 1)) {
691     GRPC_ERROR_UNREF(error);
692     return;
693   }
694   GRPC_CALL_INTERNAL_REF(c, "termination");
695   // Inform the call combiner of the cancellation, so that it can cancel
696   // any in-flight asynchronous actions that may be holding the call
697   // combiner.  This ensures that the cancel_stream batch can be sent
698   // down the filter stack in a timely manner.
699   c->call_combiner.Cancel(GRPC_ERROR_REF(error));
700   cancel_state* state = static_cast<cancel_state*>(gpr_malloc(sizeof(*state)));
701   state->call = c;
702   GRPC_CLOSURE_INIT(&state->finish_batch, done_termination, state,
703                     grpc_schedule_on_exec_ctx);
704   grpc_transport_stream_op_batch* op =
705       grpc_make_transport_stream_op(&state->finish_batch);
706   op->cancel_stream = true;
707   op->payload->cancel_stream.cancel_error = error;
708   execute_batch(c, op, &state->start_batch);
709 }
710
711 void grpc_call_cancel_internal(grpc_call* call) {
712   cancel_with_error(call, GRPC_ERROR_CANCELLED);
713 }
714
715 static grpc_error* error_from_status(grpc_status_code status,
716                                      const char* description) {
717   // copying 'description' is needed to ensure the grpc_call_cancel_with_status
718   // guarantee that can be short-lived.
719   return grpc_error_set_int(
720       grpc_error_set_str(GRPC_ERROR_CREATE_FROM_COPIED_STRING(description),
721                          GRPC_ERROR_STR_GRPC_MESSAGE,
722                          grpc_slice_from_copied_string(description)),
723       GRPC_ERROR_INT_GRPC_STATUS, status);
724 }
725
726 static void cancel_with_status(grpc_call* c, grpc_status_code status,
727                                const char* description) {
728   cancel_with_error(c, error_from_status(status, description));
729 }
730
731 static void set_final_status(grpc_call* call, grpc_error* error) {
732   if (GRPC_TRACE_FLAG_ENABLED(grpc_call_error_trace)) {
733     gpr_log(GPR_DEBUG, "set_final_status %s", call->is_client ? "CLI" : "SVR");
734     gpr_log(GPR_DEBUG, "%s", grpc_error_string(error));
735   }
736   if (call->is_client) {
737     grpc_error_get_status(error, call->send_deadline,
738                           call->final_op.client.status,
739                           call->final_op.client.status_details, nullptr,
740                           call->final_op.client.error_string);
741     // explicitly take a ref
742     grpc_slice_ref_internal(*call->final_op.client.status_details);
743     gpr_atm_rel_store(&call->status_error, reinterpret_cast<gpr_atm>(error));
744     grpc_core::channelz::ChannelNode* channelz_channel =
745         grpc_channel_get_channelz_node(call->channel);
746     if (channelz_channel != nullptr) {
747       if (*call->final_op.client.status != GRPC_STATUS_OK) {
748         channelz_channel->RecordCallFailed();
749       } else {
750         channelz_channel->RecordCallSucceeded();
751       }
752     }
753   } else {
754     *call->final_op.server.cancelled =
755         error != GRPC_ERROR_NONE ||
756         reinterpret_cast<grpc_error*>(gpr_atm_acq_load(&call->status_error)) !=
757             GRPC_ERROR_NONE;
758     grpc_core::channelz::ServerNode* channelz_server =
759         grpc_server_get_channelz_node(call->final_op.server.server);
760     if (channelz_server != nullptr) {
761       if (*call->final_op.server.cancelled) {
762         channelz_server->RecordCallFailed();
763       } else {
764         channelz_server->RecordCallSucceeded();
765       }
766     }
767     GRPC_ERROR_UNREF(error);
768   }
769 }
770
771 /*******************************************************************************
772  * COMPRESSION
773  */
774
775 static void set_incoming_message_compression_algorithm(
776     grpc_call* call, grpc_message_compression_algorithm algo) {
777   GPR_ASSERT(algo < GRPC_MESSAGE_COMPRESS_ALGORITHMS_COUNT);
778   call->incoming_message_compression_algorithm = algo;
779 }
780
781 static void set_incoming_stream_compression_algorithm(
782     grpc_call* call, grpc_stream_compression_algorithm algo) {
783   GPR_ASSERT(algo < GRPC_STREAM_COMPRESS_ALGORITHMS_COUNT);
784   call->incoming_stream_compression_algorithm = algo;
785 }
786
787 grpc_compression_algorithm grpc_call_test_only_get_compression_algorithm(
788     grpc_call* call) {
789   grpc_compression_algorithm algorithm = GRPC_COMPRESS_NONE;
790   grpc_compression_algorithm_from_message_stream_compression_algorithm(
791       &algorithm, call->incoming_message_compression_algorithm,
792       call->incoming_stream_compression_algorithm);
793   return algorithm;
794 }
795
796 static grpc_compression_algorithm compression_algorithm_for_level_locked(
797     grpc_call* call, grpc_compression_level level) {
798   return grpc_compression_algorithm_for_level(level,
799                                               call->encodings_accepted_by_peer);
800 }
801
802 uint32_t grpc_call_test_only_get_message_flags(grpc_call* call) {
803   uint32_t flags;
804   flags = call->test_only_last_message_flags;
805   return flags;
806 }
807
808 static void destroy_encodings_accepted_by_peer(void* p) { return; }
809
810 static void set_encodings_accepted_by_peer(grpc_call* call, grpc_mdelem mdel,
811                                            uint32_t* encodings_accepted_by_peer,
812                                            bool stream_encoding) {
813   size_t i;
814   uint32_t algorithm;
815   grpc_slice_buffer accept_encoding_parts;
816   grpc_slice accept_encoding_slice;
817   void* accepted_user_data;
818
819   accepted_user_data =
820       grpc_mdelem_get_user_data(mdel, destroy_encodings_accepted_by_peer);
821   if (accepted_user_data != nullptr) {
822     *encodings_accepted_by_peer =
823         static_cast<uint32_t>(((uintptr_t)accepted_user_data) - 1);
824     return;
825   }
826
827   *encodings_accepted_by_peer = 0;
828
829   accept_encoding_slice = GRPC_MDVALUE(mdel);
830   grpc_slice_buffer_init(&accept_encoding_parts);
831   grpc_slice_split_without_space(accept_encoding_slice, ",",
832                                  &accept_encoding_parts);
833
834   GPR_BITSET(encodings_accepted_by_peer, GRPC_COMPRESS_NONE);
835   for (i = 0; i < accept_encoding_parts.count; i++) {
836     int r;
837     grpc_slice accept_encoding_entry_slice = accept_encoding_parts.slices[i];
838     if (!stream_encoding) {
839       r = grpc_message_compression_algorithm_parse(
840           accept_encoding_entry_slice,
841           reinterpret_cast<grpc_message_compression_algorithm*>(&algorithm));
842     } else {
843       r = grpc_stream_compression_algorithm_parse(
844           accept_encoding_entry_slice,
845           reinterpret_cast<grpc_stream_compression_algorithm*>(&algorithm));
846     }
847     if (r) {
848       GPR_BITSET(encodings_accepted_by_peer, algorithm);
849     } else {
850       char* accept_encoding_entry_str =
851           grpc_slice_to_c_string(accept_encoding_entry_slice);
852       gpr_log(GPR_DEBUG,
853               "Unknown entry in accept encoding metadata: '%s'. Ignoring.",
854               accept_encoding_entry_str);
855       gpr_free(accept_encoding_entry_str);
856     }
857   }
858
859   grpc_slice_buffer_destroy_internal(&accept_encoding_parts);
860
861   grpc_mdelem_set_user_data(
862       mdel, destroy_encodings_accepted_by_peer,
863       (void*)((static_cast<uintptr_t>(*encodings_accepted_by_peer)) + 1));
864 }
865
866 uint32_t grpc_call_test_only_get_encodings_accepted_by_peer(grpc_call* call) {
867   uint32_t encodings_accepted_by_peer;
868   encodings_accepted_by_peer = call->encodings_accepted_by_peer;
869   return encodings_accepted_by_peer;
870 }
871
872 grpc_stream_compression_algorithm
873 grpc_call_test_only_get_incoming_stream_encodings(grpc_call* call) {
874   return call->incoming_stream_compression_algorithm;
875 }
876
877 static grpc_linked_mdelem* linked_from_md(const grpc_metadata* md) {
878   return (grpc_linked_mdelem*)&md->internal_data;
879 }
880
881 static grpc_metadata* get_md_elem(grpc_metadata* metadata,
882                                   grpc_metadata* additional_metadata, int i,
883                                   int count) {
884   grpc_metadata* res =
885       i < count ? &metadata[i] : &additional_metadata[i - count];
886   GPR_ASSERT(res);
887   return res;
888 }
889
890 static int prepare_application_metadata(grpc_call* call, int count,
891                                         grpc_metadata* metadata,
892                                         int is_trailing,
893                                         int prepend_extra_metadata,
894                                         grpc_metadata* additional_metadata,
895                                         int additional_metadata_count) {
896   int total_count = count + additional_metadata_count;
897   int i;
898   grpc_metadata_batch* batch =
899       &call->metadata_batch[0 /* is_receiving */][is_trailing];
900   for (i = 0; i < total_count; i++) {
901     const grpc_metadata* md =
902         get_md_elem(metadata, additional_metadata, i, count);
903     grpc_linked_mdelem* l = linked_from_md(md);
904     GPR_ASSERT(sizeof(grpc_linked_mdelem) == sizeof(md->internal_data));
905     if (!GRPC_LOG_IF_ERROR("validate_metadata",
906                            grpc_validate_header_key_is_legal(md->key))) {
907       break;
908     } else if (!grpc_is_binary_header_internal(md->key) &&
909                !GRPC_LOG_IF_ERROR(
910                    "validate_metadata",
911                    grpc_validate_header_nonbin_value_is_legal(md->value))) {
912       break;
913     } else if (GRPC_SLICE_LENGTH(md->value) >= UINT32_MAX) {
914       // HTTP2 hpack encoding has a maximum limit.
915       break;
916     }
917     l->md = grpc_mdelem_from_grpc_metadata(const_cast<grpc_metadata*>(md));
918   }
919   if (i != total_count) {
920     for (int j = 0; j < i; j++) {
921       const grpc_metadata* md =
922           get_md_elem(metadata, additional_metadata, j, count);
923       grpc_linked_mdelem* l = linked_from_md(md);
924       GRPC_MDELEM_UNREF(l->md);
925     }
926     return 0;
927   }
928   if (prepend_extra_metadata) {
929     if (call->send_extra_metadata_count == 0) {
930       prepend_extra_metadata = 0;
931     } else {
932       for (i = 0; i < call->send_extra_metadata_count; i++) {
933         GRPC_LOG_IF_ERROR("prepare_application_metadata",
934                           grpc_metadata_batch_link_tail(
935                               batch, &call->send_extra_metadata[i]));
936       }
937     }
938   }
939   for (i = 0; i < total_count; i++) {
940     grpc_metadata* md = get_md_elem(metadata, additional_metadata, i, count);
941     grpc_linked_mdelem* l = linked_from_md(md);
942     grpc_error* error = grpc_metadata_batch_link_tail(batch, l);
943     if (error != GRPC_ERROR_NONE) {
944       GRPC_MDELEM_UNREF(l->md);
945     }
946     GRPC_LOG_IF_ERROR("prepare_application_metadata", error);
947   }
948   call->send_extra_metadata_count = 0;
949
950   return 1;
951 }
952
953 static grpc_message_compression_algorithm decode_message_compression(
954     grpc_mdelem md) {
955   grpc_message_compression_algorithm algorithm =
956       grpc_message_compression_algorithm_from_slice(GRPC_MDVALUE(md));
957   if (algorithm == GRPC_MESSAGE_COMPRESS_ALGORITHMS_COUNT) {
958     char* md_c_str = grpc_slice_to_c_string(GRPC_MDVALUE(md));
959     gpr_log(GPR_ERROR,
960             "Invalid incoming message compression algorithm: '%s'. "
961             "Interpreting incoming data as uncompressed.",
962             md_c_str);
963     gpr_free(md_c_str);
964     return GRPC_MESSAGE_COMPRESS_NONE;
965   }
966   return algorithm;
967 }
968
969 static grpc_stream_compression_algorithm decode_stream_compression(
970     grpc_mdelem md) {
971   grpc_stream_compression_algorithm algorithm =
972       grpc_stream_compression_algorithm_from_slice(GRPC_MDVALUE(md));
973   if (algorithm == GRPC_STREAM_COMPRESS_ALGORITHMS_COUNT) {
974     char* md_c_str = grpc_slice_to_c_string(GRPC_MDVALUE(md));
975     gpr_log(GPR_ERROR,
976             "Invalid incoming stream compression algorithm: '%s'. Interpreting "
977             "incoming data as uncompressed.",
978             md_c_str);
979     gpr_free(md_c_str);
980     return GRPC_STREAM_COMPRESS_NONE;
981   }
982   return algorithm;
983 }
984
985 static void publish_app_metadata(grpc_call* call, grpc_metadata_batch* b,
986                                  int is_trailing) {
987   if (b->list.count == 0) return;
988   if (!call->is_client && is_trailing) return;
989   if (is_trailing && call->buffered_metadata[1] == nullptr) return;
990   GPR_TIMER_SCOPE("publish_app_metadata", 0);
991   grpc_metadata_array* dest;
992   grpc_metadata* mdusr;
993   dest = call->buffered_metadata[is_trailing];
994   if (dest->count + b->list.count > dest->capacity) {
995     dest->capacity =
996         GPR_MAX(dest->capacity + b->list.count, dest->capacity * 3 / 2);
997     dest->metadata = static_cast<grpc_metadata*>(
998         gpr_realloc(dest->metadata, sizeof(grpc_metadata) * dest->capacity));
999   }
1000   for (grpc_linked_mdelem* l = b->list.head; l != nullptr; l = l->next) {
1001     mdusr = &dest->metadata[dest->count++];
1002     /* we pass back borrowed slices that are valid whilst the call is valid */
1003     mdusr->key = GRPC_MDKEY(l->md);
1004     mdusr->value = GRPC_MDVALUE(l->md);
1005   }
1006 }
1007
1008 static void recv_initial_filter(grpc_call* call, grpc_metadata_batch* b) {
1009   if (b->idx.named.content_encoding != nullptr) {
1010     GPR_TIMER_SCOPE("incoming_stream_compression_algorithm", 0);
1011     set_incoming_stream_compression_algorithm(
1012         call, decode_stream_compression(b->idx.named.content_encoding->md));
1013     grpc_metadata_batch_remove(b, GRPC_BATCH_CONTENT_ENCODING);
1014   }
1015   if (b->idx.named.grpc_encoding != nullptr) {
1016     GPR_TIMER_SCOPE("incoming_message_compression_algorithm", 0);
1017     set_incoming_message_compression_algorithm(
1018         call, decode_message_compression(b->idx.named.grpc_encoding->md));
1019     grpc_metadata_batch_remove(b, GRPC_BATCH_GRPC_ENCODING);
1020   }
1021   uint32_t message_encodings_accepted_by_peer = 1u;
1022   uint32_t stream_encodings_accepted_by_peer = 1u;
1023   if (b->idx.named.grpc_accept_encoding != nullptr) {
1024     GPR_TIMER_SCOPE("encodings_accepted_by_peer", 0);
1025     set_encodings_accepted_by_peer(call, b->idx.named.grpc_accept_encoding->md,
1026                                    &message_encodings_accepted_by_peer, false);
1027     grpc_metadata_batch_remove(b, GRPC_BATCH_GRPC_ACCEPT_ENCODING);
1028   }
1029   if (b->idx.named.accept_encoding != nullptr) {
1030     GPR_TIMER_SCOPE("stream_encodings_accepted_by_peer", 0);
1031     set_encodings_accepted_by_peer(call, b->idx.named.accept_encoding->md,
1032                                    &stream_encodings_accepted_by_peer, true);
1033     grpc_metadata_batch_remove(b, GRPC_BATCH_ACCEPT_ENCODING);
1034   }
1035   call->encodings_accepted_by_peer =
1036       grpc_compression_bitset_from_message_stream_compression_bitset(
1037           message_encodings_accepted_by_peer,
1038           stream_encodings_accepted_by_peer);
1039   publish_app_metadata(call, b, false);
1040 }
1041
1042 static void recv_trailing_filter(void* args, grpc_metadata_batch* b,
1043                                  grpc_error* batch_error) {
1044   grpc_call* call = static_cast<grpc_call*>(args);
1045   if (batch_error != GRPC_ERROR_NONE) {
1046     set_final_status(call, batch_error);
1047   } else if (b->idx.named.grpc_status != nullptr) {
1048     grpc_status_code status_code =
1049         grpc_get_status_code_from_metadata(b->idx.named.grpc_status->md);
1050     grpc_error* error = GRPC_ERROR_NONE;
1051     if (status_code != GRPC_STATUS_OK) {
1052       char* peer_msg = nullptr;
1053       char* peer = grpc_call_get_peer(call);
1054       gpr_asprintf(&peer_msg, "Error received from peer %s", peer);
1055       error = grpc_error_set_int(GRPC_ERROR_CREATE_FROM_COPIED_STRING(peer_msg),
1056                                  GRPC_ERROR_INT_GRPC_STATUS,
1057                                  static_cast<intptr_t>(status_code));
1058       gpr_free(peer);
1059       gpr_free(peer_msg);
1060     }
1061     if (b->idx.named.grpc_message != nullptr) {
1062       error = grpc_error_set_str(
1063           error, GRPC_ERROR_STR_GRPC_MESSAGE,
1064           grpc_slice_ref_internal(GRPC_MDVALUE(b->idx.named.grpc_message->md)));
1065       grpc_metadata_batch_remove(b, GRPC_BATCH_GRPC_MESSAGE);
1066     } else if (error != GRPC_ERROR_NONE) {
1067       error = grpc_error_set_str(error, GRPC_ERROR_STR_GRPC_MESSAGE,
1068                                  grpc_empty_slice());
1069     }
1070     set_final_status(call, GRPC_ERROR_REF(error));
1071     grpc_metadata_batch_remove(b, GRPC_BATCH_GRPC_STATUS);
1072     GRPC_ERROR_UNREF(error);
1073   } else if (!call->is_client) {
1074     set_final_status(call, GRPC_ERROR_NONE);
1075   } else {
1076     gpr_log(GPR_DEBUG,
1077             "Received trailing metadata with no error and no status");
1078     set_final_status(
1079         call, grpc_error_set_int(
1080                   GRPC_ERROR_CREATE_FROM_STATIC_STRING("No status received"),
1081                   GRPC_ERROR_INT_GRPC_STATUS, GRPC_STATUS_UNKNOWN));
1082   }
1083   publish_app_metadata(call, b, true);
1084 }
1085
1086 grpc_core::Arena* grpc_call_get_arena(grpc_call* call) { return call->arena; }
1087
1088 grpc_call_stack* grpc_call_get_call_stack(grpc_call* call) {
1089   return CALL_STACK_FROM_CALL(call);
1090 }
1091
1092 /*******************************************************************************
1093  * BATCH API IMPLEMENTATION
1094  */
1095
1096 static bool are_write_flags_valid(uint32_t flags) {
1097   /* check that only bits in GRPC_WRITE_(INTERNAL?)_USED_MASK are set */
1098   const uint32_t allowed_write_positions =
1099       (GRPC_WRITE_USED_MASK | GRPC_WRITE_INTERNAL_USED_MASK);
1100   const uint32_t invalid_positions = ~allowed_write_positions;
1101   return !(flags & invalid_positions);
1102 }
1103
1104 static bool are_initial_metadata_flags_valid(uint32_t flags, bool is_client) {
1105   /* check that only bits in GRPC_WRITE_(INTERNAL?)_USED_MASK are set */
1106   uint32_t invalid_positions = ~GRPC_INITIAL_METADATA_USED_MASK;
1107   if (!is_client) {
1108     invalid_positions |= GRPC_INITIAL_METADATA_IDEMPOTENT_REQUEST;
1109   }
1110   return !(flags & invalid_positions);
1111 }
1112
1113 static size_t batch_slot_for_op(grpc_op_type type) {
1114   switch (type) {
1115     case GRPC_OP_SEND_INITIAL_METADATA:
1116       return 0;
1117     case GRPC_OP_SEND_MESSAGE:
1118       return 1;
1119     case GRPC_OP_SEND_CLOSE_FROM_CLIENT:
1120     case GRPC_OP_SEND_STATUS_FROM_SERVER:
1121       return 2;
1122     case GRPC_OP_RECV_INITIAL_METADATA:
1123       return 3;
1124     case GRPC_OP_RECV_MESSAGE:
1125       return 4;
1126     case GRPC_OP_RECV_CLOSE_ON_SERVER:
1127     case GRPC_OP_RECV_STATUS_ON_CLIENT:
1128       return 5;
1129   }
1130   GPR_UNREACHABLE_CODE(return 123456789);
1131 }
1132
1133 static batch_control* reuse_or_allocate_batch_control(grpc_call* call,
1134                                                       const grpc_op* ops,
1135                                                       size_t num_ops) {
1136   size_t slot_idx = batch_slot_for_op(ops[0].op);
1137   batch_control** pslot = &call->active_batches[slot_idx];
1138   batch_control* bctl;
1139   if (*pslot != nullptr) {
1140     bctl = *pslot;
1141     if (bctl->call != nullptr) {
1142       return nullptr;
1143     }
1144     bctl->~batch_control();
1145     bctl->op = {};
1146   } else {
1147     bctl = call->arena->New<batch_control>();
1148     *pslot = bctl;
1149   }
1150   bctl->call = call;
1151   bctl->op.payload = &call->stream_op_payload;
1152   return bctl;
1153 }
1154
1155 static void finish_batch_completion(void* user_data,
1156                                     grpc_cq_completion* storage) {
1157   batch_control* bctl = static_cast<batch_control*>(user_data);
1158   grpc_call* call = bctl->call;
1159   bctl->call = nullptr;
1160   GRPC_CALL_INTERNAL_UNREF(call, "completion");
1161 }
1162
1163 static void reset_batch_errors(batch_control* bctl) {
1164   GRPC_ERROR_UNREF(
1165       reinterpret_cast<grpc_error*>(gpr_atm_acq_load(&bctl->batch_error)));
1166   gpr_atm_rel_store(&bctl->batch_error,
1167                     reinterpret_cast<gpr_atm>(GRPC_ERROR_NONE));
1168 }
1169
1170 static void post_batch_completion(batch_control* bctl) {
1171   grpc_call* next_child_call;
1172   grpc_call* call = bctl->call;
1173   grpc_error* error = GRPC_ERROR_REF(
1174       reinterpret_cast<grpc_error*>(gpr_atm_acq_load(&bctl->batch_error)));
1175
1176   if (bctl->op.send_initial_metadata) {
1177     grpc_metadata_batch_destroy(
1178         &call->metadata_batch[0 /* is_receiving */][0 /* is_trailing */]);
1179   }
1180   if (bctl->op.send_message) {
1181     if (bctl->op.payload->send_message.stream_write_closed &&
1182         error == GRPC_ERROR_NONE) {
1183       error = grpc_error_add_child(
1184           error, GRPC_ERROR_CREATE_FROM_STATIC_STRING(
1185                      "Attempt to send message after stream was closed."));
1186     }
1187     call->sending_message = false;
1188   }
1189   if (bctl->op.send_trailing_metadata) {
1190     grpc_metadata_batch_destroy(
1191         &call->metadata_batch[0 /* is_receiving */][1 /* is_trailing */]);
1192   }
1193   if (bctl->op.recv_trailing_metadata) {
1194     /* propagate cancellation to any interested children */
1195     gpr_atm_rel_store(&call->received_final_op_atm, 1);
1196     parent_call* pc = get_parent_call(call);
1197     if (pc != nullptr) {
1198       grpc_call* child;
1199       gpr_mu_lock(&pc->child_list_mu);
1200       child = pc->first_child;
1201       if (child != nullptr) {
1202         do {
1203           next_child_call = child->child->sibling_next;
1204           if (child->cancellation_is_inherited) {
1205             GRPC_CALL_INTERNAL_REF(child, "propagate_cancel");
1206             cancel_with_error(child, GRPC_ERROR_CANCELLED);
1207             GRPC_CALL_INTERNAL_UNREF(child, "propagate_cancel");
1208           }
1209           child = next_child_call;
1210         } while (child != pc->first_child);
1211       }
1212       gpr_mu_unlock(&pc->child_list_mu);
1213     }
1214     GRPC_ERROR_UNREF(error);
1215     error = GRPC_ERROR_NONE;
1216   }
1217   if (error != GRPC_ERROR_NONE && bctl->op.recv_message &&
1218       *call->receiving_buffer != nullptr) {
1219     grpc_byte_buffer_destroy(*call->receiving_buffer);
1220     *call->receiving_buffer = nullptr;
1221   }
1222   reset_batch_errors(bctl);
1223
1224   if (bctl->completion_data.notify_tag.is_closure) {
1225     /* unrefs error */
1226     bctl->call = nullptr;
1227     /* This closure may be meant to be run within some combiner. Since we aren't
1228      * running in any combiner here, we need to use GRPC_CLOSURE_SCHED instead
1229      * of GRPC_CLOSURE_RUN.
1230      */
1231     GRPC_CLOSURE_SCHED((grpc_closure*)bctl->completion_data.notify_tag.tag,
1232                        error);
1233     GRPC_CALL_INTERNAL_UNREF(call, "completion");
1234   } else {
1235     /* unrefs error */
1236     grpc_cq_end_op(bctl->call->cq, bctl->completion_data.notify_tag.tag, error,
1237                    finish_batch_completion, bctl,
1238                    &bctl->completion_data.cq_completion);
1239   }
1240 }
1241
1242 static void finish_batch_step(batch_control* bctl) {
1243   if (GPR_UNLIKELY(bctl->completed_batch_step())) {
1244     post_batch_completion(bctl);
1245   }
1246 }
1247
1248 static void continue_receiving_slices(batch_control* bctl) {
1249   grpc_error* error;
1250   grpc_call* call = bctl->call;
1251   for (;;) {
1252     size_t remaining = call->receiving_stream->length() -
1253                        (*call->receiving_buffer)->data.raw.slice_buffer.length;
1254     if (remaining == 0) {
1255       call->receiving_message = 0;
1256       call->receiving_stream.reset();
1257       finish_batch_step(bctl);
1258       return;
1259     }
1260     if (call->receiving_stream->Next(remaining, &call->receiving_slice_ready)) {
1261       error = call->receiving_stream->Pull(&call->receiving_slice);
1262       if (error == GRPC_ERROR_NONE) {
1263         grpc_slice_buffer_add(&(*call->receiving_buffer)->data.raw.slice_buffer,
1264                               call->receiving_slice);
1265       } else {
1266         call->receiving_stream.reset();
1267         grpc_byte_buffer_destroy(*call->receiving_buffer);
1268         *call->receiving_buffer = nullptr;
1269         call->receiving_message = 0;
1270         finish_batch_step(bctl);
1271         return;
1272       }
1273     } else {
1274       return;
1275     }
1276   }
1277 }
1278
1279 static void receiving_slice_ready(void* bctlp, grpc_error* error) {
1280   batch_control* bctl = static_cast<batch_control*>(bctlp);
1281   grpc_call* call = bctl->call;
1282   bool release_error = false;
1283
1284   if (error == GRPC_ERROR_NONE) {
1285     grpc_slice slice;
1286     error = call->receiving_stream->Pull(&slice);
1287     if (error == GRPC_ERROR_NONE) {
1288       grpc_slice_buffer_add(&(*call->receiving_buffer)->data.raw.slice_buffer,
1289                             slice);
1290       continue_receiving_slices(bctl);
1291     } else {
1292       /* Error returned by ByteStream::Pull() needs to be released manually */
1293       release_error = true;
1294     }
1295   }
1296
1297   if (error != GRPC_ERROR_NONE) {
1298     if (GRPC_TRACE_FLAG_ENABLED(grpc_trace_operation_failures)) {
1299       GRPC_LOG_IF_ERROR("receiving_slice_ready", GRPC_ERROR_REF(error));
1300     }
1301     call->receiving_stream.reset();
1302     grpc_byte_buffer_destroy(*call->receiving_buffer);
1303     *call->receiving_buffer = nullptr;
1304     call->receiving_message = 0;
1305     finish_batch_step(bctl);
1306     if (release_error) {
1307       GRPC_ERROR_UNREF(error);
1308     }
1309   }
1310 }
1311
1312 static void process_data_after_md(batch_control* bctl) {
1313   grpc_call* call = bctl->call;
1314   if (call->receiving_stream == nullptr) {
1315     *call->receiving_buffer = nullptr;
1316     call->receiving_message = 0;
1317     finish_batch_step(bctl);
1318   } else {
1319     call->test_only_last_message_flags = call->receiving_stream->flags();
1320     if ((call->receiving_stream->flags() & GRPC_WRITE_INTERNAL_COMPRESS) &&
1321         (call->incoming_message_compression_algorithm >
1322          GRPC_MESSAGE_COMPRESS_NONE)) {
1323       grpc_compression_algorithm algo;
1324       GPR_ASSERT(
1325           grpc_compression_algorithm_from_message_stream_compression_algorithm(
1326               &algo, call->incoming_message_compression_algorithm,
1327               (grpc_stream_compression_algorithm)0));
1328       *call->receiving_buffer =
1329           grpc_raw_compressed_byte_buffer_create(nullptr, 0, algo);
1330     } else {
1331       *call->receiving_buffer = grpc_raw_byte_buffer_create(nullptr, 0);
1332     }
1333     GRPC_CLOSURE_INIT(&call->receiving_slice_ready, receiving_slice_ready, bctl,
1334                       grpc_schedule_on_exec_ctx);
1335     continue_receiving_slices(bctl);
1336   }
1337 }
1338
1339 static void receiving_stream_ready(void* bctlp, grpc_error* error) {
1340   batch_control* bctl = static_cast<batch_control*>(bctlp);
1341   grpc_call* call = bctl->call;
1342   if (error != GRPC_ERROR_NONE) {
1343     call->receiving_stream.reset();
1344     if (reinterpret_cast<grpc_error*>(gpr_atm_acq_load(&bctl->batch_error)) ==
1345         GRPC_ERROR_NONE) {
1346       gpr_atm_rel_store(&bctl->batch_error,
1347                         reinterpret_cast<gpr_atm>(GRPC_ERROR_REF(error)));
1348     }
1349     cancel_with_error(call, GRPC_ERROR_REF(error));
1350   }
1351   /* If recv_state is RECV_NONE, we will save the batch_control
1352    * object with rel_cas, and will not use it after the cas. Its corresponding
1353    * acq_load is in receiving_initial_metadata_ready() */
1354   if (error != GRPC_ERROR_NONE || call->receiving_stream == nullptr ||
1355       !gpr_atm_rel_cas(&call->recv_state, RECV_NONE, (gpr_atm)bctlp)) {
1356     process_data_after_md(bctl);
1357   }
1358 }
1359
1360 // The recv_message_ready callback used when sending a batch containing
1361 // a recv_message op down the filter stack.  Yields the call combiner
1362 // before processing the received message.
1363 static void receiving_stream_ready_in_call_combiner(void* bctlp,
1364                                                     grpc_error* error) {
1365   batch_control* bctl = static_cast<batch_control*>(bctlp);
1366   grpc_call* call = bctl->call;
1367   GRPC_CALL_COMBINER_STOP(&call->call_combiner, "recv_message_ready");
1368   receiving_stream_ready(bctlp, error);
1369 }
1370
1371 static void GPR_ATTRIBUTE_NOINLINE
1372 handle_both_stream_and_msg_compression_set(grpc_call* call) {
1373   char* error_msg = nullptr;
1374   gpr_asprintf(&error_msg,
1375                "Incoming stream has both stream compression (%d) and message "
1376                "compression (%d).",
1377                call->incoming_stream_compression_algorithm,
1378                call->incoming_message_compression_algorithm);
1379   gpr_log(GPR_ERROR, "%s", error_msg);
1380   cancel_with_status(call, GRPC_STATUS_INTERNAL, error_msg);
1381   gpr_free(error_msg);
1382 }
1383
1384 static void GPR_ATTRIBUTE_NOINLINE
1385 handle_error_parsing_compression_algorithm(grpc_call* call) {
1386   char* error_msg = nullptr;
1387   gpr_asprintf(&error_msg,
1388                "Error in incoming message compression (%d) or stream "
1389                "compression (%d).",
1390                call->incoming_stream_compression_algorithm,
1391                call->incoming_message_compression_algorithm);
1392   cancel_with_status(call, GRPC_STATUS_INTERNAL, error_msg);
1393   gpr_free(error_msg);
1394 }
1395
1396 static void GPR_ATTRIBUTE_NOINLINE handle_invalid_compression(
1397     grpc_call* call, grpc_compression_algorithm compression_algorithm) {
1398   char* error_msg = nullptr;
1399   gpr_asprintf(&error_msg, "Invalid compression algorithm value '%d'.",
1400                compression_algorithm);
1401   gpr_log(GPR_ERROR, "%s", error_msg);
1402   cancel_with_status(call, GRPC_STATUS_UNIMPLEMENTED, error_msg);
1403   gpr_free(error_msg);
1404 }
1405
1406 static void GPR_ATTRIBUTE_NOINLINE handle_compression_algorithm_disabled(
1407     grpc_call* call, grpc_compression_algorithm compression_algorithm) {
1408   char* error_msg = nullptr;
1409   const char* algo_name = nullptr;
1410   grpc_compression_algorithm_name(compression_algorithm, &algo_name);
1411   gpr_asprintf(&error_msg, "Compression algorithm '%s' is disabled.",
1412                algo_name);
1413   gpr_log(GPR_ERROR, "%s", error_msg);
1414   cancel_with_status(call, GRPC_STATUS_UNIMPLEMENTED, error_msg);
1415   gpr_free(error_msg);
1416 }
1417
1418 static void GPR_ATTRIBUTE_NOINLINE handle_compression_algorithm_not_accepted(
1419     grpc_call* call, grpc_compression_algorithm compression_algorithm) {
1420   const char* algo_name = nullptr;
1421   grpc_compression_algorithm_name(compression_algorithm, &algo_name);
1422   gpr_log(GPR_ERROR,
1423           "Compression algorithm ('%s') not present in the bitset of "
1424           "accepted encodings ('0x%x')",
1425           algo_name, call->encodings_accepted_by_peer);
1426 }
1427
1428 static void validate_filtered_metadata(batch_control* bctl) {
1429   grpc_compression_algorithm compression_algorithm;
1430   grpc_call* call = bctl->call;
1431   if (GPR_UNLIKELY(call->incoming_stream_compression_algorithm !=
1432                        GRPC_STREAM_COMPRESS_NONE &&
1433                    call->incoming_message_compression_algorithm !=
1434                        GRPC_MESSAGE_COMPRESS_NONE)) {
1435     handle_both_stream_and_msg_compression_set(call);
1436   } else if (
1437       GPR_UNLIKELY(
1438           grpc_compression_algorithm_from_message_stream_compression_algorithm(
1439               &compression_algorithm,
1440               call->incoming_message_compression_algorithm,
1441               call->incoming_stream_compression_algorithm) == 0)) {
1442     handle_error_parsing_compression_algorithm(call);
1443   } else {
1444     const grpc_compression_options compression_options =
1445         grpc_channel_compression_options(call->channel);
1446     if (GPR_UNLIKELY(compression_algorithm >= GRPC_COMPRESS_ALGORITHMS_COUNT)) {
1447       handle_invalid_compression(call, compression_algorithm);
1448     } else if (GPR_UNLIKELY(
1449                    grpc_compression_options_is_algorithm_enabled_internal(
1450                        &compression_options, compression_algorithm) == 0)) {
1451       /* check if algorithm is supported by current channel config */
1452       handle_compression_algorithm_disabled(call, compression_algorithm);
1453     }
1454     /* GRPC_COMPRESS_NONE is always set. */
1455     GPR_DEBUG_ASSERT(call->encodings_accepted_by_peer != 0);
1456     if (GPR_UNLIKELY(!GPR_BITGET(call->encodings_accepted_by_peer,
1457                                  compression_algorithm))) {
1458       if (GRPC_TRACE_FLAG_ENABLED(grpc_compression_trace)) {
1459         handle_compression_algorithm_not_accepted(call, compression_algorithm);
1460       }
1461     }
1462   }
1463 }
1464
1465 static void receiving_initial_metadata_ready(void* bctlp, grpc_error* error) {
1466   batch_control* bctl = static_cast<batch_control*>(bctlp);
1467   grpc_call* call = bctl->call;
1468
1469   GRPC_CALL_COMBINER_STOP(&call->call_combiner, "recv_initial_metadata_ready");
1470
1471   if (error == GRPC_ERROR_NONE) {
1472     grpc_metadata_batch* md =
1473         &call->metadata_batch[1 /* is_receiving */][0 /* is_trailing */];
1474     recv_initial_filter(call, md);
1475
1476     /* TODO(ctiller): this could be moved into recv_initial_filter now */
1477     GPR_TIMER_SCOPE("validate_filtered_metadata", 0);
1478     validate_filtered_metadata(bctl);
1479
1480     if (md->deadline != GRPC_MILLIS_INF_FUTURE && !call->is_client) {
1481       call->send_deadline = md->deadline;
1482     }
1483   } else {
1484     if (reinterpret_cast<grpc_error*>(gpr_atm_acq_load(&bctl->batch_error)) ==
1485         GRPC_ERROR_NONE) {
1486       gpr_atm_rel_store(&bctl->batch_error,
1487                         reinterpret_cast<gpr_atm>(GRPC_ERROR_REF(error)));
1488     }
1489     cancel_with_error(call, GRPC_ERROR_REF(error));
1490   }
1491
1492   grpc_closure* saved_rsr_closure = nullptr;
1493   while (true) {
1494     gpr_atm rsr_bctlp = gpr_atm_acq_load(&call->recv_state);
1495     /* Should only receive initial metadata once */
1496     GPR_ASSERT(rsr_bctlp != 1);
1497     if (rsr_bctlp == 0) {
1498       /* We haven't seen initial metadata and messages before, thus initial
1499        * metadata is received first.
1500        * no_barrier_cas is used, as this function won't access the batch_control
1501        * object saved by receiving_stream_ready() if the initial metadata is
1502        * received first. */
1503       if (gpr_atm_no_barrier_cas(&call->recv_state, RECV_NONE,
1504                                  RECV_INITIAL_METADATA_FIRST)) {
1505         break;
1506       }
1507     } else {
1508       /* Already received messages */
1509       saved_rsr_closure =
1510           GRPC_CLOSURE_CREATE(receiving_stream_ready, (batch_control*)rsr_bctlp,
1511                               grpc_schedule_on_exec_ctx);
1512       /* No need to modify recv_state */
1513       break;
1514     }
1515   }
1516   if (saved_rsr_closure != nullptr) {
1517     GRPC_CLOSURE_RUN(saved_rsr_closure, GRPC_ERROR_REF(error));
1518   }
1519
1520   finish_batch_step(bctl);
1521 }
1522
1523 static void receiving_trailing_metadata_ready(void* bctlp, grpc_error* error) {
1524   batch_control* bctl = static_cast<batch_control*>(bctlp);
1525   grpc_call* call = bctl->call;
1526   GRPC_CALL_COMBINER_STOP(&call->call_combiner, "recv_trailing_metadata_ready");
1527   grpc_metadata_batch* md =
1528       &call->metadata_batch[1 /* is_receiving */][1 /* is_trailing */];
1529   recv_trailing_filter(call, md, GRPC_ERROR_REF(error));
1530   finish_batch_step(bctl);
1531 }
1532
1533 static void finish_batch(void* bctlp, grpc_error* error) {
1534   batch_control* bctl = static_cast<batch_control*>(bctlp);
1535   grpc_call* call = bctl->call;
1536   GRPC_CALL_COMBINER_STOP(&call->call_combiner, "on_complete");
1537   if (reinterpret_cast<grpc_error*>(gpr_atm_acq_load(&bctl->batch_error)) ==
1538       GRPC_ERROR_NONE) {
1539     gpr_atm_rel_store(&bctl->batch_error,
1540                       reinterpret_cast<gpr_atm>(GRPC_ERROR_REF(error)));
1541   }
1542   if (error != GRPC_ERROR_NONE) {
1543     cancel_with_error(call, GRPC_ERROR_REF(error));
1544   }
1545   finish_batch_step(bctl);
1546 }
1547
1548 static void free_no_op_completion(void* p, grpc_cq_completion* completion) {
1549   gpr_free(completion);
1550 }
1551
1552 static grpc_call_error call_start_batch(grpc_call* call, const grpc_op* ops,
1553                                         size_t nops, void* notify_tag,
1554                                         int is_notify_tag_closure) {
1555   GPR_TIMER_SCOPE("call_start_batch", 0);
1556
1557   size_t i;
1558   const grpc_op* op;
1559   batch_control* bctl;
1560   bool has_send_ops = false;
1561   int num_recv_ops = 0;
1562   grpc_call_error error = GRPC_CALL_OK;
1563   grpc_transport_stream_op_batch* stream_op;
1564   grpc_transport_stream_op_batch_payload* stream_op_payload;
1565
1566   GRPC_CALL_LOG_BATCH(GPR_INFO, call, ops, nops, notify_tag);
1567
1568   if (nops == 0) {
1569     if (!is_notify_tag_closure) {
1570       GPR_ASSERT(grpc_cq_begin_op(call->cq, notify_tag));
1571       grpc_cq_end_op(call->cq, notify_tag, GRPC_ERROR_NONE,
1572                      free_no_op_completion, nullptr,
1573                      static_cast<grpc_cq_completion*>(
1574                          gpr_malloc(sizeof(grpc_cq_completion))));
1575     } else {
1576       GRPC_CLOSURE_SCHED((grpc_closure*)notify_tag, GRPC_ERROR_NONE);
1577     }
1578     error = GRPC_CALL_OK;
1579     goto done;
1580   }
1581
1582   bctl = reuse_or_allocate_batch_control(call, ops, nops);
1583   if (bctl == nullptr) {
1584     return GRPC_CALL_ERROR_TOO_MANY_OPERATIONS;
1585   }
1586   bctl->completion_data.notify_tag.tag = notify_tag;
1587   bctl->completion_data.notify_tag.is_closure =
1588       static_cast<uint8_t>(is_notify_tag_closure != 0);
1589
1590   stream_op = &bctl->op;
1591   stream_op_payload = &call->stream_op_payload;
1592
1593   /* rewrite batch ops into a transport op */
1594   for (i = 0; i < nops; i++) {
1595     op = &ops[i];
1596     if (op->reserved != nullptr) {
1597       error = GRPC_CALL_ERROR;
1598       goto done_with_error;
1599     }
1600     switch (op->op) {
1601       case GRPC_OP_SEND_INITIAL_METADATA: {
1602         /* Flag validation: currently allow no flags */
1603         if (!are_initial_metadata_flags_valid(op->flags, call->is_client)) {
1604           error = GRPC_CALL_ERROR_INVALID_FLAGS;
1605           goto done_with_error;
1606         }
1607         if (call->sent_initial_metadata) {
1608           error = GRPC_CALL_ERROR_TOO_MANY_OPERATIONS;
1609           goto done_with_error;
1610         }
1611         // TODO(juanlishen): If the user has already specified a compression
1612         // algorithm by setting the initial metadata with key of
1613         // GRPC_COMPRESSION_REQUEST_ALGORITHM_MD_KEY, we shouldn't override that
1614         // with the compression algorithm mapped from compression level.
1615         /* process compression level */
1616         grpc_metadata& compression_md = call->compression_md;
1617         compression_md.key = grpc_empty_slice();
1618         compression_md.value = grpc_empty_slice();
1619         compression_md.flags = 0;
1620         size_t additional_metadata_count = 0;
1621         grpc_compression_level effective_compression_level =
1622             GRPC_COMPRESS_LEVEL_NONE;
1623         bool level_set = false;
1624         if (op->data.send_initial_metadata.maybe_compression_level.is_set) {
1625           effective_compression_level =
1626               op->data.send_initial_metadata.maybe_compression_level.level;
1627           level_set = true;
1628         } else {
1629           const grpc_compression_options copts =
1630               grpc_channel_compression_options(call->channel);
1631           if (copts.default_level.is_set) {
1632             level_set = true;
1633             effective_compression_level = copts.default_level.level;
1634           }
1635         }
1636         // Currently, only server side supports compression level setting.
1637         if (level_set && !call->is_client) {
1638           const grpc_compression_algorithm calgo =
1639               compression_algorithm_for_level_locked(
1640                   call, effective_compression_level);
1641           // The following metadata will be checked and removed by the message
1642           // compression filter. It will be used as the call's compression
1643           // algorithm.
1644           compression_md.key = GRPC_MDSTR_GRPC_INTERNAL_ENCODING_REQUEST;
1645           compression_md.value = grpc_compression_algorithm_slice(calgo);
1646           additional_metadata_count++;
1647         }
1648         if (op->data.send_initial_metadata.count + additional_metadata_count >
1649             INT_MAX) {
1650           error = GRPC_CALL_ERROR_INVALID_METADATA;
1651           goto done_with_error;
1652         }
1653         stream_op->send_initial_metadata = true;
1654         call->sent_initial_metadata = true;
1655         if (!prepare_application_metadata(
1656                 call, static_cast<int>(op->data.send_initial_metadata.count),
1657                 op->data.send_initial_metadata.metadata, 0, call->is_client,
1658                 &compression_md, static_cast<int>(additional_metadata_count))) {
1659           error = GRPC_CALL_ERROR_INVALID_METADATA;
1660           goto done_with_error;
1661         }
1662         /* TODO(ctiller): just make these the same variable? */
1663         if (call->is_client) {
1664           call->metadata_batch[0][0].deadline = call->send_deadline;
1665         }
1666         stream_op_payload->send_initial_metadata.send_initial_metadata =
1667             &call->metadata_batch[0 /* is_receiving */][0 /* is_trailing */];
1668         stream_op_payload->send_initial_metadata.send_initial_metadata_flags =
1669             op->flags;
1670         if (call->is_client) {
1671           stream_op_payload->send_initial_metadata.peer_string =
1672               &call->peer_string;
1673         }
1674         has_send_ops = true;
1675         break;
1676       }
1677       case GRPC_OP_SEND_MESSAGE: {
1678         if (!are_write_flags_valid(op->flags)) {
1679           error = GRPC_CALL_ERROR_INVALID_FLAGS;
1680           goto done_with_error;
1681         }
1682         if (op->data.send_message.send_message == nullptr) {
1683           error = GRPC_CALL_ERROR_INVALID_MESSAGE;
1684           goto done_with_error;
1685         }
1686         if (call->sending_message) {
1687           error = GRPC_CALL_ERROR_TOO_MANY_OPERATIONS;
1688           goto done_with_error;
1689         }
1690         uint32_t flags = op->flags;
1691         /* If the outgoing buffer is already compressed, mark it as so in the
1692            flags. These will be picked up by the compression filter and further
1693            (wasteful) attempts at compression skipped. */
1694         if (op->data.send_message.send_message->data.raw.compression >
1695             GRPC_COMPRESS_NONE) {
1696           flags |= GRPC_WRITE_INTERNAL_COMPRESS;
1697         }
1698         stream_op->send_message = true;
1699         call->sending_message = true;
1700         call->sending_stream.Init(
1701             &op->data.send_message.send_message->data.raw.slice_buffer, flags);
1702         stream_op_payload->send_message.send_message.reset(
1703             call->sending_stream.get());
1704         has_send_ops = true;
1705         break;
1706       }
1707       case GRPC_OP_SEND_CLOSE_FROM_CLIENT: {
1708         /* Flag validation: currently allow no flags */
1709         if (op->flags != 0) {
1710           error = GRPC_CALL_ERROR_INVALID_FLAGS;
1711           goto done_with_error;
1712         }
1713         if (!call->is_client) {
1714           error = GRPC_CALL_ERROR_NOT_ON_SERVER;
1715           goto done_with_error;
1716         }
1717         if (call->sent_final_op) {
1718           error = GRPC_CALL_ERROR_TOO_MANY_OPERATIONS;
1719           goto done_with_error;
1720         }
1721         stream_op->send_trailing_metadata = true;
1722         call->sent_final_op = true;
1723         stream_op_payload->send_trailing_metadata.send_trailing_metadata =
1724             &call->metadata_batch[0 /* is_receiving */][1 /* is_trailing */];
1725         has_send_ops = true;
1726         break;
1727       }
1728       case GRPC_OP_SEND_STATUS_FROM_SERVER: {
1729         /* Flag validation: currently allow no flags */
1730         if (op->flags != 0) {
1731           error = GRPC_CALL_ERROR_INVALID_FLAGS;
1732           goto done_with_error;
1733         }
1734         if (call->is_client) {
1735           error = GRPC_CALL_ERROR_NOT_ON_CLIENT;
1736           goto done_with_error;
1737         }
1738         if (call->sent_final_op) {
1739           error = GRPC_CALL_ERROR_TOO_MANY_OPERATIONS;
1740           goto done_with_error;
1741         }
1742         if (op->data.send_status_from_server.trailing_metadata_count >
1743             INT_MAX) {
1744           error = GRPC_CALL_ERROR_INVALID_METADATA;
1745           goto done_with_error;
1746         }
1747         stream_op->send_trailing_metadata = true;
1748         call->sent_final_op = true;
1749         GPR_ASSERT(call->send_extra_metadata_count == 0);
1750         call->send_extra_metadata_count = 1;
1751         call->send_extra_metadata[0].md = grpc_channel_get_reffed_status_elem(
1752             call->channel, op->data.send_status_from_server.status);
1753         grpc_error* status_error =
1754             op->data.send_status_from_server.status == GRPC_STATUS_OK
1755                 ? GRPC_ERROR_NONE
1756                 : grpc_error_set_int(
1757                       GRPC_ERROR_CREATE_FROM_STATIC_STRING(
1758                           "Server returned error"),
1759                       GRPC_ERROR_INT_GRPC_STATUS,
1760                       static_cast<intptr_t>(
1761                           op->data.send_status_from_server.status));
1762         if (op->data.send_status_from_server.status_details != nullptr) {
1763           call->send_extra_metadata[1].md = grpc_mdelem_from_slices(
1764               GRPC_MDSTR_GRPC_MESSAGE,
1765               grpc_slice_ref_internal(
1766                   *op->data.send_status_from_server.status_details));
1767           call->send_extra_metadata_count++;
1768           if (status_error != GRPC_ERROR_NONE) {
1769             char* msg = grpc_slice_to_c_string(
1770                 GRPC_MDVALUE(call->send_extra_metadata[1].md));
1771             status_error =
1772                 grpc_error_set_str(status_error, GRPC_ERROR_STR_GRPC_MESSAGE,
1773                                    grpc_slice_from_copied_string(msg));
1774             gpr_free(msg);
1775           }
1776         }
1777
1778         gpr_atm_rel_store(&call->status_error,
1779                           reinterpret_cast<gpr_atm>(status_error));
1780         if (!prepare_application_metadata(
1781                 call,
1782                 static_cast<int>(
1783                     op->data.send_status_from_server.trailing_metadata_count),
1784                 op->data.send_status_from_server.trailing_metadata, 1, 1,
1785                 nullptr, 0)) {
1786           for (int n = 0; n < call->send_extra_metadata_count; n++) {
1787             GRPC_MDELEM_UNREF(call->send_extra_metadata[n].md);
1788           }
1789           call->send_extra_metadata_count = 0;
1790           error = GRPC_CALL_ERROR_INVALID_METADATA;
1791           goto done_with_error;
1792         }
1793         stream_op_payload->send_trailing_metadata.send_trailing_metadata =
1794             &call->metadata_batch[0 /* is_receiving */][1 /* is_trailing */];
1795         has_send_ops = true;
1796         break;
1797       }
1798       case GRPC_OP_RECV_INITIAL_METADATA: {
1799         /* Flag validation: currently allow no flags */
1800         if (op->flags != 0) {
1801           error = GRPC_CALL_ERROR_INVALID_FLAGS;
1802           goto done_with_error;
1803         }
1804         if (call->received_initial_metadata) {
1805           error = GRPC_CALL_ERROR_TOO_MANY_OPERATIONS;
1806           goto done_with_error;
1807         }
1808         call->received_initial_metadata = true;
1809         call->buffered_metadata[0] =
1810             op->data.recv_initial_metadata.recv_initial_metadata;
1811         GRPC_CLOSURE_INIT(&call->receiving_initial_metadata_ready,
1812                           receiving_initial_metadata_ready, bctl,
1813                           grpc_schedule_on_exec_ctx);
1814         stream_op->recv_initial_metadata = true;
1815         stream_op_payload->recv_initial_metadata.recv_initial_metadata =
1816             &call->metadata_batch[1 /* is_receiving */][0 /* is_trailing */];
1817         stream_op_payload->recv_initial_metadata.recv_initial_metadata_ready =
1818             &call->receiving_initial_metadata_ready;
1819         if (!call->is_client) {
1820           stream_op_payload->recv_initial_metadata.peer_string =
1821               &call->peer_string;
1822         }
1823         ++num_recv_ops;
1824         break;
1825       }
1826       case GRPC_OP_RECV_MESSAGE: {
1827         /* Flag validation: currently allow no flags */
1828         if (op->flags != 0) {
1829           error = GRPC_CALL_ERROR_INVALID_FLAGS;
1830           goto done_with_error;
1831         }
1832         if (call->receiving_message) {
1833           error = GRPC_CALL_ERROR_TOO_MANY_OPERATIONS;
1834           goto done_with_error;
1835         }
1836         call->receiving_message = true;
1837         stream_op->recv_message = true;
1838         call->receiving_buffer = op->data.recv_message.recv_message;
1839         stream_op_payload->recv_message.recv_message = &call->receiving_stream;
1840         GRPC_CLOSURE_INIT(&call->receiving_stream_ready,
1841                           receiving_stream_ready_in_call_combiner, bctl,
1842                           grpc_schedule_on_exec_ctx);
1843         stream_op_payload->recv_message.recv_message_ready =
1844             &call->receiving_stream_ready;
1845         ++num_recv_ops;
1846         break;
1847       }
1848       case GRPC_OP_RECV_STATUS_ON_CLIENT: {
1849         /* Flag validation: currently allow no flags */
1850         if (op->flags != 0) {
1851           error = GRPC_CALL_ERROR_INVALID_FLAGS;
1852           goto done_with_error;
1853         }
1854         if (!call->is_client) {
1855           error = GRPC_CALL_ERROR_NOT_ON_SERVER;
1856           goto done_with_error;
1857         }
1858         if (call->requested_final_op) {
1859           error = GRPC_CALL_ERROR_TOO_MANY_OPERATIONS;
1860           goto done_with_error;
1861         }
1862         call->requested_final_op = true;
1863         call->buffered_metadata[1] =
1864             op->data.recv_status_on_client.trailing_metadata;
1865         call->final_op.client.status = op->data.recv_status_on_client.status;
1866         call->final_op.client.status_details =
1867             op->data.recv_status_on_client.status_details;
1868         call->final_op.client.error_string =
1869             op->data.recv_status_on_client.error_string;
1870         stream_op->recv_trailing_metadata = true;
1871         stream_op_payload->recv_trailing_metadata.recv_trailing_metadata =
1872             &call->metadata_batch[1 /* is_receiving */][1 /* is_trailing */];
1873         stream_op_payload->recv_trailing_metadata.collect_stats =
1874             &call->final_info.stats.transport_stream_stats;
1875         GRPC_CLOSURE_INIT(&call->receiving_trailing_metadata_ready,
1876                           receiving_trailing_metadata_ready, bctl,
1877                           grpc_schedule_on_exec_ctx);
1878         stream_op_payload->recv_trailing_metadata.recv_trailing_metadata_ready =
1879             &call->receiving_trailing_metadata_ready;
1880         ++num_recv_ops;
1881         break;
1882       }
1883       case GRPC_OP_RECV_CLOSE_ON_SERVER: {
1884         /* Flag validation: currently allow no flags */
1885         if (op->flags != 0) {
1886           error = GRPC_CALL_ERROR_INVALID_FLAGS;
1887           goto done_with_error;
1888         }
1889         if (call->is_client) {
1890           error = GRPC_CALL_ERROR_NOT_ON_CLIENT;
1891           goto done_with_error;
1892         }
1893         if (call->requested_final_op) {
1894           error = GRPC_CALL_ERROR_TOO_MANY_OPERATIONS;
1895           goto done_with_error;
1896         }
1897         call->requested_final_op = true;
1898         call->final_op.server.cancelled =
1899             op->data.recv_close_on_server.cancelled;
1900         stream_op->recv_trailing_metadata = true;
1901         stream_op_payload->recv_trailing_metadata.recv_trailing_metadata =
1902             &call->metadata_batch[1 /* is_receiving */][1 /* is_trailing */];
1903         stream_op_payload->recv_trailing_metadata.collect_stats =
1904             &call->final_info.stats.transport_stream_stats;
1905         GRPC_CLOSURE_INIT(&call->receiving_trailing_metadata_ready,
1906                           receiving_trailing_metadata_ready, bctl,
1907                           grpc_schedule_on_exec_ctx);
1908         stream_op_payload->recv_trailing_metadata.recv_trailing_metadata_ready =
1909             &call->receiving_trailing_metadata_ready;
1910         ++num_recv_ops;
1911         break;
1912       }
1913     }
1914   }
1915
1916   GRPC_CALL_INTERNAL_REF(call, "completion");
1917   if (!is_notify_tag_closure) {
1918     GPR_ASSERT(grpc_cq_begin_op(call->cq, notify_tag));
1919   }
1920   bctl->set_num_steps_to_complete((has_send_ops ? 1 : 0) + num_recv_ops);
1921
1922   if (has_send_ops) {
1923     GRPC_CLOSURE_INIT(&bctl->finish_batch, finish_batch, bctl,
1924                       grpc_schedule_on_exec_ctx);
1925     stream_op->on_complete = &bctl->finish_batch;
1926   }
1927
1928   gpr_atm_rel_store(&call->any_ops_sent_atm, 1);
1929   execute_batch(call, stream_op, &bctl->start_batch);
1930
1931 done:
1932   return error;
1933
1934 done_with_error:
1935   /* reverse any mutations that occurred */
1936   if (stream_op->send_initial_metadata) {
1937     call->sent_initial_metadata = false;
1938     grpc_metadata_batch_clear(&call->metadata_batch[0][0]);
1939   }
1940   if (stream_op->send_message) {
1941     call->sending_message = false;
1942     call->sending_stream->Orphan();
1943   }
1944   if (stream_op->send_trailing_metadata) {
1945     call->sent_final_op = false;
1946     grpc_metadata_batch_clear(&call->metadata_batch[0][1]);
1947   }
1948   if (stream_op->recv_initial_metadata) {
1949     call->received_initial_metadata = false;
1950   }
1951   if (stream_op->recv_message) {
1952     call->receiving_message = false;
1953   }
1954   if (stream_op->recv_trailing_metadata) {
1955     call->requested_final_op = false;
1956   }
1957   goto done;
1958 }
1959
1960 grpc_call_error grpc_call_start_batch(grpc_call* call, const grpc_op* ops,
1961                                       size_t nops, void* tag, void* reserved) {
1962   grpc_call_error err;
1963
1964   GRPC_API_TRACE(
1965       "grpc_call_start_batch(call=%p, ops=%p, nops=%lu, tag=%p, "
1966       "reserved=%p)",
1967       5, (call, ops, (unsigned long)nops, tag, reserved));
1968
1969   if (reserved != nullptr) {
1970     err = GRPC_CALL_ERROR;
1971   } else {
1972     grpc_core::ApplicationCallbackExecCtx callback_exec_ctx;
1973     grpc_core::ExecCtx exec_ctx;
1974     err = call_start_batch(call, ops, nops, tag, 0);
1975   }
1976
1977   return err;
1978 }
1979
1980 grpc_call_error grpc_call_start_batch_and_execute(grpc_call* call,
1981                                                   const grpc_op* ops,
1982                                                   size_t nops,
1983                                                   grpc_closure* closure) {
1984   return call_start_batch(call, ops, nops, closure, 1);
1985 }
1986
1987 void grpc_call_context_set(grpc_call* call, grpc_context_index elem,
1988                            void* value, void (*destroy)(void* value)) {
1989   if (call->context[elem].destroy) {
1990     call->context[elem].destroy(call->context[elem].value);
1991   }
1992   call->context[elem].value = value;
1993   call->context[elem].destroy = destroy;
1994 }
1995
1996 void* grpc_call_context_get(grpc_call* call, grpc_context_index elem) {
1997   return call->context[elem].value;
1998 }
1999
2000 uint8_t grpc_call_is_client(grpc_call* call) { return call->is_client; }
2001
2002 grpc_compression_algorithm grpc_call_compression_for_level(
2003     grpc_call* call, grpc_compression_level level) {
2004   grpc_compression_algorithm algo =
2005       compression_algorithm_for_level_locked(call, level);
2006   return algo;
2007 }
2008
2009 const char* grpc_call_error_to_string(grpc_call_error error) {
2010   switch (error) {
2011     case GRPC_CALL_ERROR:
2012       return "GRPC_CALL_ERROR";
2013     case GRPC_CALL_ERROR_ALREADY_ACCEPTED:
2014       return "GRPC_CALL_ERROR_ALREADY_ACCEPTED";
2015     case GRPC_CALL_ERROR_ALREADY_FINISHED:
2016       return "GRPC_CALL_ERROR_ALREADY_FINISHED";
2017     case GRPC_CALL_ERROR_ALREADY_INVOKED:
2018       return "GRPC_CALL_ERROR_ALREADY_INVOKED";
2019     case GRPC_CALL_ERROR_BATCH_TOO_BIG:
2020       return "GRPC_CALL_ERROR_BATCH_TOO_BIG";
2021     case GRPC_CALL_ERROR_INVALID_FLAGS:
2022       return "GRPC_CALL_ERROR_INVALID_FLAGS";
2023     case GRPC_CALL_ERROR_INVALID_MESSAGE:
2024       return "GRPC_CALL_ERROR_INVALID_MESSAGE";
2025     case GRPC_CALL_ERROR_INVALID_METADATA:
2026       return "GRPC_CALL_ERROR_INVALID_METADATA";
2027     case GRPC_CALL_ERROR_NOT_INVOKED:
2028       return "GRPC_CALL_ERROR_NOT_INVOKED";
2029     case GRPC_CALL_ERROR_NOT_ON_CLIENT:
2030       return "GRPC_CALL_ERROR_NOT_ON_CLIENT";
2031     case GRPC_CALL_ERROR_NOT_ON_SERVER:
2032       return "GRPC_CALL_ERROR_NOT_ON_SERVER";
2033     case GRPC_CALL_ERROR_NOT_SERVER_COMPLETION_QUEUE:
2034       return "GRPC_CALL_ERROR_NOT_SERVER_COMPLETION_QUEUE";
2035     case GRPC_CALL_ERROR_PAYLOAD_TYPE_MISMATCH:
2036       return "GRPC_CALL_ERROR_PAYLOAD_TYPE_MISMATCH";
2037     case GRPC_CALL_ERROR_TOO_MANY_OPERATIONS:
2038       return "GRPC_CALL_ERROR_TOO_MANY_OPERATIONS";
2039     case GRPC_CALL_ERROR_COMPLETION_QUEUE_SHUTDOWN:
2040       return "GRPC_CALL_ERROR_COMPLETION_QUEUE_SHUTDOWN";
2041     case GRPC_CALL_OK:
2042       return "GRPC_CALL_OK";
2043   }
2044   GPR_UNREACHABLE_CODE(return "GRPC_CALL_ERROR_UNKNOW");
2045 }