3 * Copyright 2015 gRPC authors.
5 * Licensed under the Apache License, Version 2.0 (the "License");
6 * you may not use this file except in compliance with the License.
7 * You may obtain a copy of the License at
9 * http://www.apache.org/licenses/LICENSE-2.0
11 * Unless required by applicable law or agreed to in writing, software
12 * distributed under the License is distributed on an "AS IS" BASIS,
13 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14 * See the License for the specific language governing permissions and
15 * limitations under the License.
19 #include <grpc/support/port_platform.h>
27 #include <grpc/compression.h>
28 #include <grpc/grpc.h>
29 #include <grpc/slice.h>
30 #include <grpc/support/alloc.h>
31 #include <grpc/support/log.h>
32 #include <grpc/support/string_util.h>
34 #include "src/core/lib/channel/channel_stack.h"
35 #include "src/core/lib/compression/algorithm_metadata.h"
36 #include "src/core/lib/debug/stats.h"
37 #include "src/core/lib/gpr/alloc.h"
38 #include "src/core/lib/gpr/string.h"
39 #include "src/core/lib/gpr/time_precise.h"
40 #include "src/core/lib/gpr/useful.h"
41 #include "src/core/lib/gprpp/arena.h"
42 #include "src/core/lib/gprpp/manual_constructor.h"
43 #include "src/core/lib/gprpp/ref_counted.h"
44 #include "src/core/lib/iomgr/timer.h"
45 #include "src/core/lib/profiling/timers.h"
46 #include "src/core/lib/slice/slice_string_helpers.h"
47 #include "src/core/lib/slice/slice_utils.h"
48 #include "src/core/lib/surface/api_trace.h"
49 #include "src/core/lib/surface/call.h"
50 #include "src/core/lib/surface/call_test_only.h"
51 #include "src/core/lib/surface/channel.h"
52 #include "src/core/lib/surface/completion_queue.h"
53 #include "src/core/lib/surface/server.h"
54 #include "src/core/lib/surface/validate_metadata.h"
55 #include "src/core/lib/transport/error_utils.h"
56 #include "src/core/lib/transport/metadata.h"
57 #include "src/core/lib/transport/static_metadata.h"
58 #include "src/core/lib/transport/status_metadata.h"
59 #include "src/core/lib/transport/transport.h"
61 /** The maximum number of concurrent batches possible.
62 Based upon the maximum number of individually queueable ops in the batch
64 - initial metadata send
66 - status/close send (depending on client/server)
67 - initial metadata recv
69 - status/close recv (depending on client/server) */
70 #define MAX_CONCURRENT_BATCHES 6
72 #define MAX_SEND_EXTRA_METADATA_COUNT 3
74 // Used to create arena for the first call.
75 #define ESTIMATED_MDELEM_COUNT 16
77 struct batch_control {
78 batch_control() = default;
80 grpc_call* call = nullptr;
81 grpc_transport_stream_op_batch op;
82 /* Share memory for cq_completion and notify_tag as they are never needed
83 simultaneously. Each byte used in this data structure count as six bytes
84 per call, so any savings we can make are worthwhile,
86 We use notify_tag to determine whether or not to send notification to the
87 completion queue. Once we've made that determination, we can reuse the
88 memory for cq_completion. */
90 grpc_cq_completion cq_completion;
92 /* Any given op indicates completion by either (a) calling a closure or
93 (b) sending a notification on the call's completion queue. If
94 \a is_closure is true, \a tag indicates a closure to be invoked;
95 otherwise, \a tag indicates the tag to be used in the notification to
96 be sent to the completion queue. */
101 grpc_closure start_batch;
102 grpc_closure finish_batch;
103 grpc_core::Atomic<intptr_t> steps_to_complete;
104 gpr_atm batch_error = reinterpret_cast<gpr_atm>(GRPC_ERROR_NONE);
105 void set_num_steps_to_complete(uintptr_t steps) {
106 steps_to_complete.Store(steps, grpc_core::MemoryOrder::RELEASE);
108 bool completed_batch_step() {
109 return steps_to_complete.FetchSub(1, grpc_core::MemoryOrder::ACQ_REL) == 1;
114 parent_call() { gpr_mu_init(&child_list_mu); }
115 ~parent_call() { gpr_mu_destroy(&child_list_mu); }
117 gpr_mu child_list_mu;
118 grpc_call* first_child = nullptr;
122 child_call(grpc_call* parent) : parent(parent) {}
124 /** siblings: children of the same parent form a list, and this list is
127 grpc_call* sibling_next = nullptr;
128 grpc_call* sibling_prev = nullptr;
131 #define RECV_NONE ((gpr_atm)0)
132 #define RECV_INITIAL_METADATA_FIRST ((gpr_atm)1)
135 grpc_call(grpc_core::Arena* arena, const grpc_call_create_args& args)
138 channel(args.channel),
139 is_client(args.server_transport_data == nullptr),
140 stream_op_payload(context) {
141 for (int i = 0; i < 2; i++) {
142 for (int j = 0; j < 2; j++) {
143 metadata_batch[i][j].deadline = GRPC_MILLIS_INF_FUTURE;
149 gpr_free(static_cast<void*>(const_cast<char*>(final_info.error_string)));
152 grpc_core::RefCount ext_ref;
153 grpc_core::Arena* arena;
154 grpc_core::CallCombiner call_combiner;
155 grpc_completion_queue* cq;
156 grpc_polling_entity pollent;
157 grpc_channel* channel;
158 gpr_cycle_counter start_time = gpr_get_cycle_counter();
159 /* parent_call* */ gpr_atm parent_call_atm = 0;
160 child_call* child = nullptr;
162 /* client or server call */
164 /** has grpc_call_unref been called */
165 bool destroy_called = false;
166 /** flag indicating that cancellation is inherited */
167 bool cancellation_is_inherited = false;
168 /** which ops are in-flight */
169 bool sent_initial_metadata = false;
170 bool sending_message = false;
171 bool sent_final_op = false;
172 bool received_initial_metadata = false;
173 bool receiving_message = false;
174 bool requested_final_op = false;
175 gpr_atm any_ops_sent_atm = 0;
176 gpr_atm received_final_op_atm = 0;
178 batch_control* active_batches[MAX_CONCURRENT_BATCHES] = {};
179 grpc_transport_stream_op_batch_payload stream_op_payload;
181 /* first idx: is_receiving, second idx: is_trailing */
182 grpc_metadata_batch metadata_batch[2][2] = {};
184 /* Buffered read metadata waiting to be returned to the application.
185 Element 0 is initial metadata, element 1 is trailing metadata. */
186 grpc_metadata_array* buffered_metadata[2] = {};
188 grpc_metadata compression_md;
190 // A char* indicating the peer name.
191 gpr_atm peer_string = 0;
193 /* Call data useful used for reporting. Only valid after the call has
195 grpc_call_final_info final_info;
197 /* Compression algorithm for *incoming* data */
198 grpc_message_compression_algorithm incoming_message_compression_algorithm =
199 GRPC_MESSAGE_COMPRESS_NONE;
200 /* Stream compression algorithm for *incoming* data */
201 grpc_stream_compression_algorithm incoming_stream_compression_algorithm =
202 GRPC_STREAM_COMPRESS_NONE;
203 /* Supported encodings (compression algorithms), a bitset.
204 * Always support no compression. */
205 uint32_t encodings_accepted_by_peer = 1 << GRPC_MESSAGE_COMPRESS_NONE;
206 /* Supported stream encodings (stream compression algorithms), a bitset */
207 uint32_t stream_encodings_accepted_by_peer = 0;
209 /* Contexts for various subsystems (security, tracing, ...). */
210 grpc_call_context_element context[GRPC_CONTEXT_COUNT] = {};
212 /* for the client, extra metadata is initial metadata; for the
213 server, it's trailing metadata */
214 grpc_linked_mdelem send_extra_metadata[MAX_SEND_EXTRA_METADATA_COUNT];
215 int send_extra_metadata_count;
216 grpc_millis send_deadline;
218 grpc_core::ManualConstructor<grpc_core::SliceBufferByteStream> sending_stream;
220 grpc_core::OrphanablePtr<grpc_core::ByteStream> receiving_stream;
221 grpc_byte_buffer** receiving_buffer = nullptr;
222 grpc_slice receiving_slice = grpc_empty_slice();
223 grpc_closure receiving_slice_ready;
224 grpc_closure receiving_stream_ready;
225 grpc_closure receiving_initial_metadata_ready;
226 grpc_closure receiving_trailing_metadata_ready;
227 uint32_t test_only_last_message_flags = 0;
228 gpr_atm cancelled = 0;
230 grpc_closure release_call;
234 grpc_status_code* status;
235 grpc_slice* status_details;
236 const char** error_string;
240 // backpointer to owning server if this is a server side call.
244 gpr_atm status_error = 0;
246 /* recv_state can contain one of the following values:
247 RECV_NONE : : no initial metadata and messages received
248 RECV_INITIAL_METADATA_FIRST : received initial metadata first
249 a batch_control* : received messages first
251 +------1------RECV_NONE------3-----+
255 RECV_INITIAL_METADATA_FIRST receiving_stream_ready_bctlp
258 +-----2-----+ +-----4-----+
260 For 1, 4: See receiving_initial_metadata_ready() function
261 For 2, 3: See receiving_stream_ready() function */
262 gpr_atm recv_state = 0;
265 grpc_core::TraceFlag grpc_call_error_trace(false, "call_error");
266 grpc_core::TraceFlag grpc_compression_trace(false, "compression");
268 #define CALL_STACK_FROM_CALL(call) \
269 (grpc_call_stack*)((char*)(call) + \
270 GPR_ROUND_UP_TO_ALIGNMENT_SIZE(sizeof(grpc_call)))
271 #define CALL_FROM_CALL_STACK(call_stack) \
272 (grpc_call*)(((char*)(call_stack)) - \
273 GPR_ROUND_UP_TO_ALIGNMENT_SIZE(sizeof(grpc_call)))
275 #define CALL_ELEM_FROM_CALL(call, idx) \
276 grpc_call_stack_element(CALL_STACK_FROM_CALL(call), idx)
277 #define CALL_FROM_TOP_ELEM(top_elem) \
278 CALL_FROM_CALL_STACK(grpc_call_stack_from_top_element(top_elem))
280 static void execute_batch(grpc_call* call, grpc_transport_stream_op_batch* op,
281 grpc_closure* start_batch_closure);
283 static void cancel_with_status(grpc_call* c, grpc_status_code status,
284 const char* description);
285 static void cancel_with_error(grpc_call* c, grpc_error* error);
286 static void destroy_call(void* call_stack, grpc_error* error);
287 static void receiving_slice_ready(void* bctlp, grpc_error* error);
288 static void set_final_status(grpc_call* call, grpc_error* error);
289 static void process_data_after_md(batch_control* bctl);
290 static void post_batch_completion(batch_control* bctl);
292 static void add_init_error(grpc_error** composite, grpc_error* new_err) {
293 if (new_err == GRPC_ERROR_NONE) return;
294 if (*composite == GRPC_ERROR_NONE)
295 *composite = GRPC_ERROR_CREATE_FROM_STATIC_STRING("Call creation failed");
296 *composite = grpc_error_add_child(*composite, new_err);
299 void* grpc_call_arena_alloc(grpc_call* call, size_t size) {
300 return call->arena->Alloc(size);
303 static parent_call* get_or_create_parent_call(grpc_call* call) {
304 parent_call* p = (parent_call*)gpr_atm_acq_load(&call->parent_call_atm);
306 p = call->arena->New<parent_call>();
307 if (!gpr_atm_rel_cas(&call->parent_call_atm, (gpr_atm) nullptr,
310 p = (parent_call*)gpr_atm_acq_load(&call->parent_call_atm);
316 static parent_call* get_parent_call(grpc_call* call) {
317 return (parent_call*)gpr_atm_acq_load(&call->parent_call_atm);
320 size_t grpc_call_get_initial_size_estimate() {
321 return sizeof(grpc_call) + sizeof(batch_control) * MAX_CONCURRENT_BATCHES +
322 sizeof(grpc_linked_mdelem) * ESTIMATED_MDELEM_COUNT;
325 grpc_error* grpc_call_create(const grpc_call_create_args* args,
326 grpc_call** out_call) {
327 GPR_TIMER_SCOPE("grpc_call_create", 0);
329 GRPC_CHANNEL_INTERNAL_REF(args->channel, "call");
331 grpc_core::Arena* arena;
333 grpc_error* error = GRPC_ERROR_NONE;
334 grpc_channel_stack* channel_stack =
335 grpc_channel_get_channel_stack(args->channel);
336 size_t initial_size = grpc_channel_get_call_size_estimate(args->channel);
337 GRPC_STATS_INC_CALL_INITIAL_SIZE(initial_size);
338 size_t call_and_stack_size =
339 GPR_ROUND_UP_TO_ALIGNMENT_SIZE(sizeof(grpc_call)) +
340 channel_stack->call_stack_size;
341 size_t call_alloc_size =
342 call_and_stack_size + (args->parent ? sizeof(child_call) : 0);
344 std::pair<grpc_core::Arena*, void*> arena_with_call =
345 grpc_core::Arena::CreateWithAlloc(initial_size, call_alloc_size);
346 arena = arena_with_call.first;
347 call = new (arena_with_call.second) grpc_call(arena, *args);
349 grpc_slice path = grpc_empty_slice();
350 if (call->is_client) {
351 call->final_op.client.status_details = nullptr;
352 call->final_op.client.status = nullptr;
353 call->final_op.client.error_string = nullptr;
354 GRPC_STATS_INC_CLIENT_CALLS_CREATED();
355 GPR_ASSERT(args->add_initial_metadata_count <
356 MAX_SEND_EXTRA_METADATA_COUNT);
357 for (size_t i = 0; i < args->add_initial_metadata_count; i++) {
358 call->send_extra_metadata[i].md = args->add_initial_metadata[i];
359 if (grpc_slice_eq_static_interned(
360 GRPC_MDKEY(args->add_initial_metadata[i]), GRPC_MDSTR_PATH)) {
361 path = grpc_slice_ref_internal(
362 GRPC_MDVALUE(args->add_initial_metadata[i]));
365 call->send_extra_metadata_count =
366 static_cast<int>(args->add_initial_metadata_count);
368 GRPC_STATS_INC_SERVER_CALLS_CREATED();
369 call->final_op.server.cancelled = nullptr;
370 call->final_op.server.server = args->server;
371 GPR_ASSERT(args->add_initial_metadata_count == 0);
372 call->send_extra_metadata_count = 0;
375 grpc_millis send_deadline = args->send_deadline;
376 bool immediately_cancel = false;
378 if (args->parent != nullptr) {
379 call->child = new (reinterpret_cast<char*>(arena_with_call.second) +
380 call_and_stack_size) child_call(args->parent);
382 GRPC_CALL_INTERNAL_REF(args->parent, "child");
383 GPR_ASSERT(call->is_client);
384 GPR_ASSERT(!args->parent->is_client);
386 if (args->propagation_mask & GRPC_PROPAGATE_DEADLINE) {
387 send_deadline = GPR_MIN(send_deadline, args->parent->send_deadline);
389 /* for now GRPC_PROPAGATE_TRACING_CONTEXT *MUST* be passed with
390 * GRPC_PROPAGATE_STATS_CONTEXT */
391 /* TODO(ctiller): This should change to use the appropriate census start_op
393 if (args->propagation_mask & GRPC_PROPAGATE_CENSUS_TRACING_CONTEXT) {
394 if (0 == (args->propagation_mask & GRPC_PROPAGATE_CENSUS_STATS_CONTEXT)) {
395 add_init_error(&error, GRPC_ERROR_CREATE_FROM_STATIC_STRING(
396 "Census tracing propagation requested "
397 "without Census context propagation"));
399 grpc_call_context_set(call, GRPC_CONTEXT_TRACING,
400 args->parent->context[GRPC_CONTEXT_TRACING].value,
402 } else if (args->propagation_mask & GRPC_PROPAGATE_CENSUS_STATS_CONTEXT) {
403 add_init_error(&error, GRPC_ERROR_CREATE_FROM_STATIC_STRING(
404 "Census context propagation requested "
405 "without Census tracing propagation"));
407 if (args->propagation_mask & GRPC_PROPAGATE_CANCELLATION) {
408 call->cancellation_is_inherited = 1;
409 if (gpr_atm_acq_load(&args->parent->received_final_op_atm)) {
410 immediately_cancel = true;
414 call->send_deadline = send_deadline;
415 /* initial refcount dropped by grpc_call_unref */
416 grpc_call_element_args call_args = {CALL_STACK_FROM_CALL(call),
417 args->server_transport_data,
423 &call->call_combiner};
424 add_init_error(&error, grpc_call_stack_init(channel_stack, 1, destroy_call,
426 // Publish this call to parent only after the call stack has been initialized.
427 if (args->parent != nullptr) {
428 child_call* cc = call->child;
429 parent_call* pc = get_or_create_parent_call(args->parent);
430 gpr_mu_lock(&pc->child_list_mu);
431 if (pc->first_child == nullptr) {
432 pc->first_child = call;
433 cc->sibling_next = cc->sibling_prev = call;
435 cc->sibling_next = pc->first_child;
436 cc->sibling_prev = pc->first_child->child->sibling_prev;
437 cc->sibling_next->child->sibling_prev =
438 cc->sibling_prev->child->sibling_next = call;
440 gpr_mu_unlock(&pc->child_list_mu);
443 if (error != GRPC_ERROR_NONE) {
444 cancel_with_error(call, GRPC_ERROR_REF(error));
446 if (immediately_cancel) {
447 cancel_with_error(call, GRPC_ERROR_CANCELLED);
449 if (args->cq != nullptr) {
450 GPR_ASSERT(args->pollset_set_alternative == nullptr &&
451 "Only one of 'cq' and 'pollset_set_alternative' should be "
453 GRPC_CQ_INTERNAL_REF(args->cq, "bind");
455 grpc_polling_entity_create_from_pollset(grpc_cq_pollset(args->cq));
457 if (args->pollset_set_alternative != nullptr) {
458 call->pollent = grpc_polling_entity_create_from_pollset_set(
459 args->pollset_set_alternative);
461 if (!grpc_polling_entity_is_empty(&call->pollent)) {
462 grpc_call_stack_set_pollset_or_pollset_set(CALL_STACK_FROM_CALL(call),
466 if (call->is_client) {
467 grpc_core::channelz::ChannelNode* channelz_channel =
468 grpc_channel_get_channelz_node(call->channel);
469 if (channelz_channel != nullptr) {
470 channelz_channel->RecordCallStarted();
473 grpc_core::channelz::ServerNode* channelz_server =
474 grpc_server_get_channelz_node(call->final_op.server.server);
475 if (channelz_server != nullptr) {
476 channelz_server->RecordCallStarted();
480 grpc_slice_unref_internal(path);
485 void grpc_call_set_completion_queue(grpc_call* call,
486 grpc_completion_queue* cq) {
489 if (grpc_polling_entity_pollset_set(&call->pollent) != nullptr) {
490 gpr_log(GPR_ERROR, "A pollset_set is already registered for this call.");
494 GRPC_CQ_INTERNAL_REF(cq, "bind");
495 call->pollent = grpc_polling_entity_create_from_pollset(grpc_cq_pollset(cq));
496 grpc_call_stack_set_pollset_or_pollset_set(CALL_STACK_FROM_CALL(call),
501 #define REF_REASON reason
502 #define REF_ARG , const char* reason
504 #define REF_REASON ""
507 void grpc_call_internal_ref(grpc_call* c REF_ARG) {
508 GRPC_CALL_STACK_REF(CALL_STACK_FROM_CALL(c), REF_REASON);
510 void grpc_call_internal_unref(grpc_call* c REF_ARG) {
511 GRPC_CALL_STACK_UNREF(CALL_STACK_FROM_CALL(c), REF_REASON);
514 static void release_call(void* call, grpc_error* error) {
515 grpc_call* c = static_cast<grpc_call*>(call);
516 grpc_channel* channel = c->channel;
517 grpc_core::Arena* arena = c->arena;
519 grpc_channel_update_call_size_estimate(channel, arena->Destroy());
520 GRPC_CHANNEL_INTERNAL_UNREF(channel, "call");
523 static void destroy_call(void* call, grpc_error* error) {
524 GPR_TIMER_SCOPE("destroy_call", 0);
527 grpc_call* c = static_cast<grpc_call*>(call);
528 for (i = 0; i < 2; i++) {
529 grpc_metadata_batch_destroy(
530 &c->metadata_batch[1 /* is_receiving */][i /* is_initial */]);
532 c->receiving_stream.reset();
533 parent_call* pc = get_parent_call(c);
537 for (ii = 0; ii < c->send_extra_metadata_count; ii++) {
538 GRPC_MDELEM_UNREF(c->send_extra_metadata[ii].md);
540 for (i = 0; i < GRPC_CONTEXT_COUNT; i++) {
541 if (c->context[i].destroy) {
542 c->context[i].destroy(c->context[i].value);
546 GRPC_CQ_INTERNAL_UNREF(c->cq, "bind");
549 grpc_error* status_error =
550 reinterpret_cast<grpc_error*>(gpr_atm_acq_load(&c->status_error));
551 grpc_error_get_status(status_error, c->send_deadline,
552 &c->final_info.final_status, nullptr, nullptr,
553 &(c->final_info.error_string));
554 GRPC_ERROR_UNREF(status_error);
555 c->final_info.stats.latency =
556 gpr_cycle_counter_sub(gpr_get_cycle_counter(), c->start_time);
557 grpc_call_stack_destroy(CALL_STACK_FROM_CALL(c), &c->final_info,
558 GRPC_CLOSURE_INIT(&c->release_call, release_call, c,
559 grpc_schedule_on_exec_ctx));
562 void grpc_call_ref(grpc_call* c) { c->ext_ref.Ref(); }
564 void grpc_call_unref(grpc_call* c) {
565 if (GPR_LIKELY(!c->ext_ref.Unref())) return;
567 GPR_TIMER_SCOPE("grpc_call_unref", 0);
569 child_call* cc = c->child;
570 grpc_core::ApplicationCallbackExecCtx callback_exec_ctx;
571 grpc_core::ExecCtx exec_ctx;
573 GRPC_API_TRACE("grpc_call_unref(c=%p)", 1, (c));
576 parent_call* pc = get_parent_call(cc->parent);
577 gpr_mu_lock(&pc->child_list_mu);
578 if (c == pc->first_child) {
579 pc->first_child = cc->sibling_next;
580 if (c == pc->first_child) {
581 pc->first_child = nullptr;
584 cc->sibling_prev->child->sibling_next = cc->sibling_next;
585 cc->sibling_next->child->sibling_prev = cc->sibling_prev;
586 gpr_mu_unlock(&pc->child_list_mu);
587 GRPC_CALL_INTERNAL_UNREF(cc->parent, "child");
590 GPR_ASSERT(!c->destroy_called);
591 c->destroy_called = 1;
592 bool cancel = gpr_atm_acq_load(&c->any_ops_sent_atm) != 0 &&
593 gpr_atm_acq_load(&c->received_final_op_atm) == 0;
595 cancel_with_error(c, GRPC_ERROR_CANCELLED);
597 // Unset the call combiner cancellation closure. This has the
598 // effect of scheduling the previously set cancellation closure, if
599 // any, so that it can release any internal references it may be
600 // holding to the call stack. Also flush the closures on exec_ctx so that
601 // filters that schedule cancel notification closures on exec_ctx do not
602 // need to take a ref of the call stack to guarantee closure liveness.
603 c->call_combiner.SetNotifyOnCancel(nullptr);
604 grpc_core::ExecCtx::Get()->Flush();
606 GRPC_CALL_INTERNAL_UNREF(c, "destroy");
609 grpc_call_error grpc_call_cancel(grpc_call* call, void* reserved) {
610 GRPC_API_TRACE("grpc_call_cancel(call=%p, reserved=%p)", 2, (call, reserved));
611 GPR_ASSERT(!reserved);
612 grpc_core::ApplicationCallbackExecCtx callback_exec_ctx;
613 grpc_core::ExecCtx exec_ctx;
614 cancel_with_error(call, GRPC_ERROR_CANCELLED);
618 // This is called via the call combiner to start sending a batch down
620 static void execute_batch_in_call_combiner(void* arg, grpc_error* ignored) {
621 GPR_TIMER_SCOPE("execute_batch_in_call_combiner", 0);
622 grpc_transport_stream_op_batch* batch =
623 static_cast<grpc_transport_stream_op_batch*>(arg);
624 grpc_call* call = static_cast<grpc_call*>(batch->handler_private.extra_arg);
625 grpc_call_element* elem = CALL_ELEM_FROM_CALL(call, 0);
626 GRPC_CALL_LOG_OP(GPR_INFO, elem, batch);
627 elem->filter->start_transport_stream_op_batch(elem, batch);
630 // start_batch_closure points to a caller-allocated closure to be used
631 // for entering the call combiner.
632 static void execute_batch(grpc_call* call,
633 grpc_transport_stream_op_batch* batch,
634 grpc_closure* start_batch_closure) {
635 batch->handler_private.extra_arg = call;
636 GRPC_CLOSURE_INIT(start_batch_closure, execute_batch_in_call_combiner, batch,
637 grpc_schedule_on_exec_ctx);
638 GRPC_CALL_COMBINER_START(&call->call_combiner, start_batch_closure,
639 GRPC_ERROR_NONE, "executing batch");
642 char* grpc_call_get_peer(grpc_call* call) {
643 char* peer_string = (char*)gpr_atm_acq_load(&call->peer_string);
644 if (peer_string != nullptr) return gpr_strdup(peer_string);
645 peer_string = grpc_channel_get_target(call->channel);
646 if (peer_string != nullptr) return peer_string;
647 return gpr_strdup("unknown");
650 grpc_call* grpc_call_from_top_element(grpc_call_element* elem) {
651 return CALL_FROM_TOP_ELEM(elem);
654 /*******************************************************************************
658 grpc_call_error grpc_call_cancel_with_status(grpc_call* c,
659 grpc_status_code status,
660 const char* description,
662 grpc_core::ApplicationCallbackExecCtx callback_exec_ctx;
663 grpc_core::ExecCtx exec_ctx;
665 "grpc_call_cancel_with_status("
666 "c=%p, status=%d, description=%s, reserved=%p)",
667 4, (c, (int)status, description, reserved));
668 GPR_ASSERT(reserved == nullptr);
669 cancel_with_status(c, status, description);
675 grpc_closure start_batch;
676 grpc_closure finish_batch;
679 // The on_complete callback used when sending a cancel_stream batch down
680 // the filter stack. Yields the call combiner when the batch is done.
681 static void done_termination(void* arg, grpc_error* error) {
682 cancel_state* state = static_cast<cancel_state*>(arg);
683 GRPC_CALL_COMBINER_STOP(&state->call->call_combiner,
684 "on_complete for cancel_stream op");
685 GRPC_CALL_INTERNAL_UNREF(state->call, "termination");
689 static void cancel_with_error(grpc_call* c, grpc_error* error) {
690 if (!gpr_atm_rel_cas(&c->cancelled, 0, 1)) {
691 GRPC_ERROR_UNREF(error);
694 GRPC_CALL_INTERNAL_REF(c, "termination");
695 // Inform the call combiner of the cancellation, so that it can cancel
696 // any in-flight asynchronous actions that may be holding the call
697 // combiner. This ensures that the cancel_stream batch can be sent
698 // down the filter stack in a timely manner.
699 c->call_combiner.Cancel(GRPC_ERROR_REF(error));
700 cancel_state* state = static_cast<cancel_state*>(gpr_malloc(sizeof(*state)));
702 GRPC_CLOSURE_INIT(&state->finish_batch, done_termination, state,
703 grpc_schedule_on_exec_ctx);
704 grpc_transport_stream_op_batch* op =
705 grpc_make_transport_stream_op(&state->finish_batch);
706 op->cancel_stream = true;
707 op->payload->cancel_stream.cancel_error = error;
708 execute_batch(c, op, &state->start_batch);
711 void grpc_call_cancel_internal(grpc_call* call) {
712 cancel_with_error(call, GRPC_ERROR_CANCELLED);
715 static grpc_error* error_from_status(grpc_status_code status,
716 const char* description) {
717 // copying 'description' is needed to ensure the grpc_call_cancel_with_status
718 // guarantee that can be short-lived.
719 return grpc_error_set_int(
720 grpc_error_set_str(GRPC_ERROR_CREATE_FROM_COPIED_STRING(description),
721 GRPC_ERROR_STR_GRPC_MESSAGE,
722 grpc_slice_from_copied_string(description)),
723 GRPC_ERROR_INT_GRPC_STATUS, status);
726 static void cancel_with_status(grpc_call* c, grpc_status_code status,
727 const char* description) {
728 cancel_with_error(c, error_from_status(status, description));
731 static void set_final_status(grpc_call* call, grpc_error* error) {
732 if (GRPC_TRACE_FLAG_ENABLED(grpc_call_error_trace)) {
733 gpr_log(GPR_DEBUG, "set_final_status %s", call->is_client ? "CLI" : "SVR");
734 gpr_log(GPR_DEBUG, "%s", grpc_error_string(error));
736 if (call->is_client) {
737 grpc_error_get_status(error, call->send_deadline,
738 call->final_op.client.status,
739 call->final_op.client.status_details, nullptr,
740 call->final_op.client.error_string);
741 // explicitly take a ref
742 grpc_slice_ref_internal(*call->final_op.client.status_details);
743 gpr_atm_rel_store(&call->status_error, reinterpret_cast<gpr_atm>(error));
744 grpc_core::channelz::ChannelNode* channelz_channel =
745 grpc_channel_get_channelz_node(call->channel);
746 if (channelz_channel != nullptr) {
747 if (*call->final_op.client.status != GRPC_STATUS_OK) {
748 channelz_channel->RecordCallFailed();
750 channelz_channel->RecordCallSucceeded();
754 *call->final_op.server.cancelled =
755 error != GRPC_ERROR_NONE ||
756 reinterpret_cast<grpc_error*>(gpr_atm_acq_load(&call->status_error)) !=
758 grpc_core::channelz::ServerNode* channelz_server =
759 grpc_server_get_channelz_node(call->final_op.server.server);
760 if (channelz_server != nullptr) {
761 if (*call->final_op.server.cancelled) {
762 channelz_server->RecordCallFailed();
764 channelz_server->RecordCallSucceeded();
767 GRPC_ERROR_UNREF(error);
771 /*******************************************************************************
775 static void set_incoming_message_compression_algorithm(
776 grpc_call* call, grpc_message_compression_algorithm algo) {
777 GPR_ASSERT(algo < GRPC_MESSAGE_COMPRESS_ALGORITHMS_COUNT);
778 call->incoming_message_compression_algorithm = algo;
781 static void set_incoming_stream_compression_algorithm(
782 grpc_call* call, grpc_stream_compression_algorithm algo) {
783 GPR_ASSERT(algo < GRPC_STREAM_COMPRESS_ALGORITHMS_COUNT);
784 call->incoming_stream_compression_algorithm = algo;
787 grpc_compression_algorithm grpc_call_test_only_get_compression_algorithm(
789 grpc_compression_algorithm algorithm = GRPC_COMPRESS_NONE;
790 grpc_compression_algorithm_from_message_stream_compression_algorithm(
791 &algorithm, call->incoming_message_compression_algorithm,
792 call->incoming_stream_compression_algorithm);
796 static grpc_compression_algorithm compression_algorithm_for_level_locked(
797 grpc_call* call, grpc_compression_level level) {
798 return grpc_compression_algorithm_for_level(level,
799 call->encodings_accepted_by_peer);
802 uint32_t grpc_call_test_only_get_message_flags(grpc_call* call) {
804 flags = call->test_only_last_message_flags;
808 static void destroy_encodings_accepted_by_peer(void* p) { return; }
810 static void set_encodings_accepted_by_peer(grpc_call* call, grpc_mdelem mdel,
811 uint32_t* encodings_accepted_by_peer,
812 bool stream_encoding) {
815 grpc_slice_buffer accept_encoding_parts;
816 grpc_slice accept_encoding_slice;
817 void* accepted_user_data;
820 grpc_mdelem_get_user_data(mdel, destroy_encodings_accepted_by_peer);
821 if (accepted_user_data != nullptr) {
822 *encodings_accepted_by_peer =
823 static_cast<uint32_t>(((uintptr_t)accepted_user_data) - 1);
827 *encodings_accepted_by_peer = 0;
829 accept_encoding_slice = GRPC_MDVALUE(mdel);
830 grpc_slice_buffer_init(&accept_encoding_parts);
831 grpc_slice_split_without_space(accept_encoding_slice, ",",
832 &accept_encoding_parts);
834 GPR_BITSET(encodings_accepted_by_peer, GRPC_COMPRESS_NONE);
835 for (i = 0; i < accept_encoding_parts.count; i++) {
837 grpc_slice accept_encoding_entry_slice = accept_encoding_parts.slices[i];
838 if (!stream_encoding) {
839 r = grpc_message_compression_algorithm_parse(
840 accept_encoding_entry_slice,
841 reinterpret_cast<grpc_message_compression_algorithm*>(&algorithm));
843 r = grpc_stream_compression_algorithm_parse(
844 accept_encoding_entry_slice,
845 reinterpret_cast<grpc_stream_compression_algorithm*>(&algorithm));
848 GPR_BITSET(encodings_accepted_by_peer, algorithm);
850 char* accept_encoding_entry_str =
851 grpc_slice_to_c_string(accept_encoding_entry_slice);
853 "Unknown entry in accept encoding metadata: '%s'. Ignoring.",
854 accept_encoding_entry_str);
855 gpr_free(accept_encoding_entry_str);
859 grpc_slice_buffer_destroy_internal(&accept_encoding_parts);
861 grpc_mdelem_set_user_data(
862 mdel, destroy_encodings_accepted_by_peer,
863 (void*)((static_cast<uintptr_t>(*encodings_accepted_by_peer)) + 1));
866 uint32_t grpc_call_test_only_get_encodings_accepted_by_peer(grpc_call* call) {
867 uint32_t encodings_accepted_by_peer;
868 encodings_accepted_by_peer = call->encodings_accepted_by_peer;
869 return encodings_accepted_by_peer;
872 grpc_stream_compression_algorithm
873 grpc_call_test_only_get_incoming_stream_encodings(grpc_call* call) {
874 return call->incoming_stream_compression_algorithm;
877 static grpc_linked_mdelem* linked_from_md(const grpc_metadata* md) {
878 return (grpc_linked_mdelem*)&md->internal_data;
881 static grpc_metadata* get_md_elem(grpc_metadata* metadata,
882 grpc_metadata* additional_metadata, int i,
885 i < count ? &metadata[i] : &additional_metadata[i - count];
890 static int prepare_application_metadata(grpc_call* call, int count,
891 grpc_metadata* metadata,
893 int prepend_extra_metadata,
894 grpc_metadata* additional_metadata,
895 int additional_metadata_count) {
896 int total_count = count + additional_metadata_count;
898 grpc_metadata_batch* batch =
899 &call->metadata_batch[0 /* is_receiving */][is_trailing];
900 for (i = 0; i < total_count; i++) {
901 const grpc_metadata* md =
902 get_md_elem(metadata, additional_metadata, i, count);
903 grpc_linked_mdelem* l = linked_from_md(md);
904 GPR_ASSERT(sizeof(grpc_linked_mdelem) == sizeof(md->internal_data));
905 if (!GRPC_LOG_IF_ERROR("validate_metadata",
906 grpc_validate_header_key_is_legal(md->key))) {
908 } else if (!grpc_is_binary_header_internal(md->key) &&
911 grpc_validate_header_nonbin_value_is_legal(md->value))) {
913 } else if (GRPC_SLICE_LENGTH(md->value) >= UINT32_MAX) {
914 // HTTP2 hpack encoding has a maximum limit.
917 l->md = grpc_mdelem_from_grpc_metadata(const_cast<grpc_metadata*>(md));
919 if (i != total_count) {
920 for (int j = 0; j < i; j++) {
921 const grpc_metadata* md =
922 get_md_elem(metadata, additional_metadata, j, count);
923 grpc_linked_mdelem* l = linked_from_md(md);
924 GRPC_MDELEM_UNREF(l->md);
928 if (prepend_extra_metadata) {
929 if (call->send_extra_metadata_count == 0) {
930 prepend_extra_metadata = 0;
932 for (i = 0; i < call->send_extra_metadata_count; i++) {
933 GRPC_LOG_IF_ERROR("prepare_application_metadata",
934 grpc_metadata_batch_link_tail(
935 batch, &call->send_extra_metadata[i]));
939 for (i = 0; i < total_count; i++) {
940 grpc_metadata* md = get_md_elem(metadata, additional_metadata, i, count);
941 grpc_linked_mdelem* l = linked_from_md(md);
942 grpc_error* error = grpc_metadata_batch_link_tail(batch, l);
943 if (error != GRPC_ERROR_NONE) {
944 GRPC_MDELEM_UNREF(l->md);
946 GRPC_LOG_IF_ERROR("prepare_application_metadata", error);
948 call->send_extra_metadata_count = 0;
953 static grpc_message_compression_algorithm decode_message_compression(
955 grpc_message_compression_algorithm algorithm =
956 grpc_message_compression_algorithm_from_slice(GRPC_MDVALUE(md));
957 if (algorithm == GRPC_MESSAGE_COMPRESS_ALGORITHMS_COUNT) {
958 char* md_c_str = grpc_slice_to_c_string(GRPC_MDVALUE(md));
960 "Invalid incoming message compression algorithm: '%s'. "
961 "Interpreting incoming data as uncompressed.",
964 return GRPC_MESSAGE_COMPRESS_NONE;
969 static grpc_stream_compression_algorithm decode_stream_compression(
971 grpc_stream_compression_algorithm algorithm =
972 grpc_stream_compression_algorithm_from_slice(GRPC_MDVALUE(md));
973 if (algorithm == GRPC_STREAM_COMPRESS_ALGORITHMS_COUNT) {
974 char* md_c_str = grpc_slice_to_c_string(GRPC_MDVALUE(md));
976 "Invalid incoming stream compression algorithm: '%s'. Interpreting "
977 "incoming data as uncompressed.",
980 return GRPC_STREAM_COMPRESS_NONE;
985 static void publish_app_metadata(grpc_call* call, grpc_metadata_batch* b,
987 if (b->list.count == 0) return;
988 if (!call->is_client && is_trailing) return;
989 if (is_trailing && call->buffered_metadata[1] == nullptr) return;
990 GPR_TIMER_SCOPE("publish_app_metadata", 0);
991 grpc_metadata_array* dest;
992 grpc_metadata* mdusr;
993 dest = call->buffered_metadata[is_trailing];
994 if (dest->count + b->list.count > dest->capacity) {
996 GPR_MAX(dest->capacity + b->list.count, dest->capacity * 3 / 2);
997 dest->metadata = static_cast<grpc_metadata*>(
998 gpr_realloc(dest->metadata, sizeof(grpc_metadata) * dest->capacity));
1000 for (grpc_linked_mdelem* l = b->list.head; l != nullptr; l = l->next) {
1001 mdusr = &dest->metadata[dest->count++];
1002 /* we pass back borrowed slices that are valid whilst the call is valid */
1003 mdusr->key = GRPC_MDKEY(l->md);
1004 mdusr->value = GRPC_MDVALUE(l->md);
1008 static void recv_initial_filter(grpc_call* call, grpc_metadata_batch* b) {
1009 if (b->idx.named.content_encoding != nullptr) {
1010 GPR_TIMER_SCOPE("incoming_stream_compression_algorithm", 0);
1011 set_incoming_stream_compression_algorithm(
1012 call, decode_stream_compression(b->idx.named.content_encoding->md));
1013 grpc_metadata_batch_remove(b, GRPC_BATCH_CONTENT_ENCODING);
1015 if (b->idx.named.grpc_encoding != nullptr) {
1016 GPR_TIMER_SCOPE("incoming_message_compression_algorithm", 0);
1017 set_incoming_message_compression_algorithm(
1018 call, decode_message_compression(b->idx.named.grpc_encoding->md));
1019 grpc_metadata_batch_remove(b, GRPC_BATCH_GRPC_ENCODING);
1021 uint32_t message_encodings_accepted_by_peer = 1u;
1022 uint32_t stream_encodings_accepted_by_peer = 1u;
1023 if (b->idx.named.grpc_accept_encoding != nullptr) {
1024 GPR_TIMER_SCOPE("encodings_accepted_by_peer", 0);
1025 set_encodings_accepted_by_peer(call, b->idx.named.grpc_accept_encoding->md,
1026 &message_encodings_accepted_by_peer, false);
1027 grpc_metadata_batch_remove(b, GRPC_BATCH_GRPC_ACCEPT_ENCODING);
1029 if (b->idx.named.accept_encoding != nullptr) {
1030 GPR_TIMER_SCOPE("stream_encodings_accepted_by_peer", 0);
1031 set_encodings_accepted_by_peer(call, b->idx.named.accept_encoding->md,
1032 &stream_encodings_accepted_by_peer, true);
1033 grpc_metadata_batch_remove(b, GRPC_BATCH_ACCEPT_ENCODING);
1035 call->encodings_accepted_by_peer =
1036 grpc_compression_bitset_from_message_stream_compression_bitset(
1037 message_encodings_accepted_by_peer,
1038 stream_encodings_accepted_by_peer);
1039 publish_app_metadata(call, b, false);
1042 static void recv_trailing_filter(void* args, grpc_metadata_batch* b,
1043 grpc_error* batch_error) {
1044 grpc_call* call = static_cast<grpc_call*>(args);
1045 if (batch_error != GRPC_ERROR_NONE) {
1046 set_final_status(call, batch_error);
1047 } else if (b->idx.named.grpc_status != nullptr) {
1048 grpc_status_code status_code =
1049 grpc_get_status_code_from_metadata(b->idx.named.grpc_status->md);
1050 grpc_error* error = GRPC_ERROR_NONE;
1051 if (status_code != GRPC_STATUS_OK) {
1052 char* peer_msg = nullptr;
1053 char* peer = grpc_call_get_peer(call);
1054 gpr_asprintf(&peer_msg, "Error received from peer %s", peer);
1055 error = grpc_error_set_int(GRPC_ERROR_CREATE_FROM_COPIED_STRING(peer_msg),
1056 GRPC_ERROR_INT_GRPC_STATUS,
1057 static_cast<intptr_t>(status_code));
1061 if (b->idx.named.grpc_message != nullptr) {
1062 error = grpc_error_set_str(
1063 error, GRPC_ERROR_STR_GRPC_MESSAGE,
1064 grpc_slice_ref_internal(GRPC_MDVALUE(b->idx.named.grpc_message->md)));
1065 grpc_metadata_batch_remove(b, GRPC_BATCH_GRPC_MESSAGE);
1066 } else if (error != GRPC_ERROR_NONE) {
1067 error = grpc_error_set_str(error, GRPC_ERROR_STR_GRPC_MESSAGE,
1068 grpc_empty_slice());
1070 set_final_status(call, GRPC_ERROR_REF(error));
1071 grpc_metadata_batch_remove(b, GRPC_BATCH_GRPC_STATUS);
1072 GRPC_ERROR_UNREF(error);
1073 } else if (!call->is_client) {
1074 set_final_status(call, GRPC_ERROR_NONE);
1077 "Received trailing metadata with no error and no status");
1079 call, grpc_error_set_int(
1080 GRPC_ERROR_CREATE_FROM_STATIC_STRING("No status received"),
1081 GRPC_ERROR_INT_GRPC_STATUS, GRPC_STATUS_UNKNOWN));
1083 publish_app_metadata(call, b, true);
1086 grpc_core::Arena* grpc_call_get_arena(grpc_call* call) { return call->arena; }
1088 grpc_call_stack* grpc_call_get_call_stack(grpc_call* call) {
1089 return CALL_STACK_FROM_CALL(call);
1092 /*******************************************************************************
1093 * BATCH API IMPLEMENTATION
1096 static bool are_write_flags_valid(uint32_t flags) {
1097 /* check that only bits in GRPC_WRITE_(INTERNAL?)_USED_MASK are set */
1098 const uint32_t allowed_write_positions =
1099 (GRPC_WRITE_USED_MASK | GRPC_WRITE_INTERNAL_USED_MASK);
1100 const uint32_t invalid_positions = ~allowed_write_positions;
1101 return !(flags & invalid_positions);
1104 static bool are_initial_metadata_flags_valid(uint32_t flags, bool is_client) {
1105 /* check that only bits in GRPC_WRITE_(INTERNAL?)_USED_MASK are set */
1106 uint32_t invalid_positions = ~GRPC_INITIAL_METADATA_USED_MASK;
1108 invalid_positions |= GRPC_INITIAL_METADATA_IDEMPOTENT_REQUEST;
1110 return !(flags & invalid_positions);
1113 static size_t batch_slot_for_op(grpc_op_type type) {
1115 case GRPC_OP_SEND_INITIAL_METADATA:
1117 case GRPC_OP_SEND_MESSAGE:
1119 case GRPC_OP_SEND_CLOSE_FROM_CLIENT:
1120 case GRPC_OP_SEND_STATUS_FROM_SERVER:
1122 case GRPC_OP_RECV_INITIAL_METADATA:
1124 case GRPC_OP_RECV_MESSAGE:
1126 case GRPC_OP_RECV_CLOSE_ON_SERVER:
1127 case GRPC_OP_RECV_STATUS_ON_CLIENT:
1130 GPR_UNREACHABLE_CODE(return 123456789);
1133 static batch_control* reuse_or_allocate_batch_control(grpc_call* call,
1136 size_t slot_idx = batch_slot_for_op(ops[0].op);
1137 batch_control** pslot = &call->active_batches[slot_idx];
1138 batch_control* bctl;
1139 if (*pslot != nullptr) {
1141 if (bctl->call != nullptr) {
1144 bctl->~batch_control();
1147 bctl = call->arena->New<batch_control>();
1151 bctl->op.payload = &call->stream_op_payload;
1155 static void finish_batch_completion(void* user_data,
1156 grpc_cq_completion* storage) {
1157 batch_control* bctl = static_cast<batch_control*>(user_data);
1158 grpc_call* call = bctl->call;
1159 bctl->call = nullptr;
1160 GRPC_CALL_INTERNAL_UNREF(call, "completion");
1163 static void reset_batch_errors(batch_control* bctl) {
1165 reinterpret_cast<grpc_error*>(gpr_atm_acq_load(&bctl->batch_error)));
1166 gpr_atm_rel_store(&bctl->batch_error,
1167 reinterpret_cast<gpr_atm>(GRPC_ERROR_NONE));
1170 static void post_batch_completion(batch_control* bctl) {
1171 grpc_call* next_child_call;
1172 grpc_call* call = bctl->call;
1173 grpc_error* error = GRPC_ERROR_REF(
1174 reinterpret_cast<grpc_error*>(gpr_atm_acq_load(&bctl->batch_error)));
1176 if (bctl->op.send_initial_metadata) {
1177 grpc_metadata_batch_destroy(
1178 &call->metadata_batch[0 /* is_receiving */][0 /* is_trailing */]);
1180 if (bctl->op.send_message) {
1181 if (bctl->op.payload->send_message.stream_write_closed &&
1182 error == GRPC_ERROR_NONE) {
1183 error = grpc_error_add_child(
1184 error, GRPC_ERROR_CREATE_FROM_STATIC_STRING(
1185 "Attempt to send message after stream was closed."));
1187 call->sending_message = false;
1189 if (bctl->op.send_trailing_metadata) {
1190 grpc_metadata_batch_destroy(
1191 &call->metadata_batch[0 /* is_receiving */][1 /* is_trailing */]);
1193 if (bctl->op.recv_trailing_metadata) {
1194 /* propagate cancellation to any interested children */
1195 gpr_atm_rel_store(&call->received_final_op_atm, 1);
1196 parent_call* pc = get_parent_call(call);
1197 if (pc != nullptr) {
1199 gpr_mu_lock(&pc->child_list_mu);
1200 child = pc->first_child;
1201 if (child != nullptr) {
1203 next_child_call = child->child->sibling_next;
1204 if (child->cancellation_is_inherited) {
1205 GRPC_CALL_INTERNAL_REF(child, "propagate_cancel");
1206 cancel_with_error(child, GRPC_ERROR_CANCELLED);
1207 GRPC_CALL_INTERNAL_UNREF(child, "propagate_cancel");
1209 child = next_child_call;
1210 } while (child != pc->first_child);
1212 gpr_mu_unlock(&pc->child_list_mu);
1214 GRPC_ERROR_UNREF(error);
1215 error = GRPC_ERROR_NONE;
1217 if (error != GRPC_ERROR_NONE && bctl->op.recv_message &&
1218 *call->receiving_buffer != nullptr) {
1219 grpc_byte_buffer_destroy(*call->receiving_buffer);
1220 *call->receiving_buffer = nullptr;
1222 reset_batch_errors(bctl);
1224 if (bctl->completion_data.notify_tag.is_closure) {
1226 bctl->call = nullptr;
1227 /* This closure may be meant to be run within some combiner. Since we aren't
1228 * running in any combiner here, we need to use GRPC_CLOSURE_SCHED instead
1229 * of GRPC_CLOSURE_RUN.
1231 GRPC_CLOSURE_SCHED((grpc_closure*)bctl->completion_data.notify_tag.tag,
1233 GRPC_CALL_INTERNAL_UNREF(call, "completion");
1236 grpc_cq_end_op(bctl->call->cq, bctl->completion_data.notify_tag.tag, error,
1237 finish_batch_completion, bctl,
1238 &bctl->completion_data.cq_completion);
1242 static void finish_batch_step(batch_control* bctl) {
1243 if (GPR_UNLIKELY(bctl->completed_batch_step())) {
1244 post_batch_completion(bctl);
1248 static void continue_receiving_slices(batch_control* bctl) {
1250 grpc_call* call = bctl->call;
1252 size_t remaining = call->receiving_stream->length() -
1253 (*call->receiving_buffer)->data.raw.slice_buffer.length;
1254 if (remaining == 0) {
1255 call->receiving_message = 0;
1256 call->receiving_stream.reset();
1257 finish_batch_step(bctl);
1260 if (call->receiving_stream->Next(remaining, &call->receiving_slice_ready)) {
1261 error = call->receiving_stream->Pull(&call->receiving_slice);
1262 if (error == GRPC_ERROR_NONE) {
1263 grpc_slice_buffer_add(&(*call->receiving_buffer)->data.raw.slice_buffer,
1264 call->receiving_slice);
1266 call->receiving_stream.reset();
1267 grpc_byte_buffer_destroy(*call->receiving_buffer);
1268 *call->receiving_buffer = nullptr;
1269 call->receiving_message = 0;
1270 finish_batch_step(bctl);
1279 static void receiving_slice_ready(void* bctlp, grpc_error* error) {
1280 batch_control* bctl = static_cast<batch_control*>(bctlp);
1281 grpc_call* call = bctl->call;
1282 bool release_error = false;
1284 if (error == GRPC_ERROR_NONE) {
1286 error = call->receiving_stream->Pull(&slice);
1287 if (error == GRPC_ERROR_NONE) {
1288 grpc_slice_buffer_add(&(*call->receiving_buffer)->data.raw.slice_buffer,
1290 continue_receiving_slices(bctl);
1292 /* Error returned by ByteStream::Pull() needs to be released manually */
1293 release_error = true;
1297 if (error != GRPC_ERROR_NONE) {
1298 if (GRPC_TRACE_FLAG_ENABLED(grpc_trace_operation_failures)) {
1299 GRPC_LOG_IF_ERROR("receiving_slice_ready", GRPC_ERROR_REF(error));
1301 call->receiving_stream.reset();
1302 grpc_byte_buffer_destroy(*call->receiving_buffer);
1303 *call->receiving_buffer = nullptr;
1304 call->receiving_message = 0;
1305 finish_batch_step(bctl);
1306 if (release_error) {
1307 GRPC_ERROR_UNREF(error);
1312 static void process_data_after_md(batch_control* bctl) {
1313 grpc_call* call = bctl->call;
1314 if (call->receiving_stream == nullptr) {
1315 *call->receiving_buffer = nullptr;
1316 call->receiving_message = 0;
1317 finish_batch_step(bctl);
1319 call->test_only_last_message_flags = call->receiving_stream->flags();
1320 if ((call->receiving_stream->flags() & GRPC_WRITE_INTERNAL_COMPRESS) &&
1321 (call->incoming_message_compression_algorithm >
1322 GRPC_MESSAGE_COMPRESS_NONE)) {
1323 grpc_compression_algorithm algo;
1325 grpc_compression_algorithm_from_message_stream_compression_algorithm(
1326 &algo, call->incoming_message_compression_algorithm,
1327 (grpc_stream_compression_algorithm)0));
1328 *call->receiving_buffer =
1329 grpc_raw_compressed_byte_buffer_create(nullptr, 0, algo);
1331 *call->receiving_buffer = grpc_raw_byte_buffer_create(nullptr, 0);
1333 GRPC_CLOSURE_INIT(&call->receiving_slice_ready, receiving_slice_ready, bctl,
1334 grpc_schedule_on_exec_ctx);
1335 continue_receiving_slices(bctl);
1339 static void receiving_stream_ready(void* bctlp, grpc_error* error) {
1340 batch_control* bctl = static_cast<batch_control*>(bctlp);
1341 grpc_call* call = bctl->call;
1342 if (error != GRPC_ERROR_NONE) {
1343 call->receiving_stream.reset();
1344 if (reinterpret_cast<grpc_error*>(gpr_atm_acq_load(&bctl->batch_error)) ==
1346 gpr_atm_rel_store(&bctl->batch_error,
1347 reinterpret_cast<gpr_atm>(GRPC_ERROR_REF(error)));
1349 cancel_with_error(call, GRPC_ERROR_REF(error));
1351 /* If recv_state is RECV_NONE, we will save the batch_control
1352 * object with rel_cas, and will not use it after the cas. Its corresponding
1353 * acq_load is in receiving_initial_metadata_ready() */
1354 if (error != GRPC_ERROR_NONE || call->receiving_stream == nullptr ||
1355 !gpr_atm_rel_cas(&call->recv_state, RECV_NONE, (gpr_atm)bctlp)) {
1356 process_data_after_md(bctl);
1360 // The recv_message_ready callback used when sending a batch containing
1361 // a recv_message op down the filter stack. Yields the call combiner
1362 // before processing the received message.
1363 static void receiving_stream_ready_in_call_combiner(void* bctlp,
1364 grpc_error* error) {
1365 batch_control* bctl = static_cast<batch_control*>(bctlp);
1366 grpc_call* call = bctl->call;
1367 GRPC_CALL_COMBINER_STOP(&call->call_combiner, "recv_message_ready");
1368 receiving_stream_ready(bctlp, error);
1371 static void GPR_ATTRIBUTE_NOINLINE
1372 handle_both_stream_and_msg_compression_set(grpc_call* call) {
1373 char* error_msg = nullptr;
1374 gpr_asprintf(&error_msg,
1375 "Incoming stream has both stream compression (%d) and message "
1376 "compression (%d).",
1377 call->incoming_stream_compression_algorithm,
1378 call->incoming_message_compression_algorithm);
1379 gpr_log(GPR_ERROR, "%s", error_msg);
1380 cancel_with_status(call, GRPC_STATUS_INTERNAL, error_msg);
1381 gpr_free(error_msg);
1384 static void GPR_ATTRIBUTE_NOINLINE
1385 handle_error_parsing_compression_algorithm(grpc_call* call) {
1386 char* error_msg = nullptr;
1387 gpr_asprintf(&error_msg,
1388 "Error in incoming message compression (%d) or stream "
1389 "compression (%d).",
1390 call->incoming_stream_compression_algorithm,
1391 call->incoming_message_compression_algorithm);
1392 cancel_with_status(call, GRPC_STATUS_INTERNAL, error_msg);
1393 gpr_free(error_msg);
1396 static void GPR_ATTRIBUTE_NOINLINE handle_invalid_compression(
1397 grpc_call* call, grpc_compression_algorithm compression_algorithm) {
1398 char* error_msg = nullptr;
1399 gpr_asprintf(&error_msg, "Invalid compression algorithm value '%d'.",
1400 compression_algorithm);
1401 gpr_log(GPR_ERROR, "%s", error_msg);
1402 cancel_with_status(call, GRPC_STATUS_UNIMPLEMENTED, error_msg);
1403 gpr_free(error_msg);
1406 static void GPR_ATTRIBUTE_NOINLINE handle_compression_algorithm_disabled(
1407 grpc_call* call, grpc_compression_algorithm compression_algorithm) {
1408 char* error_msg = nullptr;
1409 const char* algo_name = nullptr;
1410 grpc_compression_algorithm_name(compression_algorithm, &algo_name);
1411 gpr_asprintf(&error_msg, "Compression algorithm '%s' is disabled.",
1413 gpr_log(GPR_ERROR, "%s", error_msg);
1414 cancel_with_status(call, GRPC_STATUS_UNIMPLEMENTED, error_msg);
1415 gpr_free(error_msg);
1418 static void GPR_ATTRIBUTE_NOINLINE handle_compression_algorithm_not_accepted(
1419 grpc_call* call, grpc_compression_algorithm compression_algorithm) {
1420 const char* algo_name = nullptr;
1421 grpc_compression_algorithm_name(compression_algorithm, &algo_name);
1423 "Compression algorithm ('%s') not present in the bitset of "
1424 "accepted encodings ('0x%x')",
1425 algo_name, call->encodings_accepted_by_peer);
1428 static void validate_filtered_metadata(batch_control* bctl) {
1429 grpc_compression_algorithm compression_algorithm;
1430 grpc_call* call = bctl->call;
1431 if (GPR_UNLIKELY(call->incoming_stream_compression_algorithm !=
1432 GRPC_STREAM_COMPRESS_NONE &&
1433 call->incoming_message_compression_algorithm !=
1434 GRPC_MESSAGE_COMPRESS_NONE)) {
1435 handle_both_stream_and_msg_compression_set(call);
1438 grpc_compression_algorithm_from_message_stream_compression_algorithm(
1439 &compression_algorithm,
1440 call->incoming_message_compression_algorithm,
1441 call->incoming_stream_compression_algorithm) == 0)) {
1442 handle_error_parsing_compression_algorithm(call);
1444 const grpc_compression_options compression_options =
1445 grpc_channel_compression_options(call->channel);
1446 if (GPR_UNLIKELY(compression_algorithm >= GRPC_COMPRESS_ALGORITHMS_COUNT)) {
1447 handle_invalid_compression(call, compression_algorithm);
1448 } else if (GPR_UNLIKELY(
1449 grpc_compression_options_is_algorithm_enabled_internal(
1450 &compression_options, compression_algorithm) == 0)) {
1451 /* check if algorithm is supported by current channel config */
1452 handle_compression_algorithm_disabled(call, compression_algorithm);
1454 /* GRPC_COMPRESS_NONE is always set. */
1455 GPR_DEBUG_ASSERT(call->encodings_accepted_by_peer != 0);
1456 if (GPR_UNLIKELY(!GPR_BITGET(call->encodings_accepted_by_peer,
1457 compression_algorithm))) {
1458 if (GRPC_TRACE_FLAG_ENABLED(grpc_compression_trace)) {
1459 handle_compression_algorithm_not_accepted(call, compression_algorithm);
1465 static void receiving_initial_metadata_ready(void* bctlp, grpc_error* error) {
1466 batch_control* bctl = static_cast<batch_control*>(bctlp);
1467 grpc_call* call = bctl->call;
1469 GRPC_CALL_COMBINER_STOP(&call->call_combiner, "recv_initial_metadata_ready");
1471 if (error == GRPC_ERROR_NONE) {
1472 grpc_metadata_batch* md =
1473 &call->metadata_batch[1 /* is_receiving */][0 /* is_trailing */];
1474 recv_initial_filter(call, md);
1476 /* TODO(ctiller): this could be moved into recv_initial_filter now */
1477 GPR_TIMER_SCOPE("validate_filtered_metadata", 0);
1478 validate_filtered_metadata(bctl);
1480 if (md->deadline != GRPC_MILLIS_INF_FUTURE && !call->is_client) {
1481 call->send_deadline = md->deadline;
1484 if (reinterpret_cast<grpc_error*>(gpr_atm_acq_load(&bctl->batch_error)) ==
1486 gpr_atm_rel_store(&bctl->batch_error,
1487 reinterpret_cast<gpr_atm>(GRPC_ERROR_REF(error)));
1489 cancel_with_error(call, GRPC_ERROR_REF(error));
1492 grpc_closure* saved_rsr_closure = nullptr;
1494 gpr_atm rsr_bctlp = gpr_atm_acq_load(&call->recv_state);
1495 /* Should only receive initial metadata once */
1496 GPR_ASSERT(rsr_bctlp != 1);
1497 if (rsr_bctlp == 0) {
1498 /* We haven't seen initial metadata and messages before, thus initial
1499 * metadata is received first.
1500 * no_barrier_cas is used, as this function won't access the batch_control
1501 * object saved by receiving_stream_ready() if the initial metadata is
1502 * received first. */
1503 if (gpr_atm_no_barrier_cas(&call->recv_state, RECV_NONE,
1504 RECV_INITIAL_METADATA_FIRST)) {
1508 /* Already received messages */
1510 GRPC_CLOSURE_CREATE(receiving_stream_ready, (batch_control*)rsr_bctlp,
1511 grpc_schedule_on_exec_ctx);
1512 /* No need to modify recv_state */
1516 if (saved_rsr_closure != nullptr) {
1517 GRPC_CLOSURE_RUN(saved_rsr_closure, GRPC_ERROR_REF(error));
1520 finish_batch_step(bctl);
1523 static void receiving_trailing_metadata_ready(void* bctlp, grpc_error* error) {
1524 batch_control* bctl = static_cast<batch_control*>(bctlp);
1525 grpc_call* call = bctl->call;
1526 GRPC_CALL_COMBINER_STOP(&call->call_combiner, "recv_trailing_metadata_ready");
1527 grpc_metadata_batch* md =
1528 &call->metadata_batch[1 /* is_receiving */][1 /* is_trailing */];
1529 recv_trailing_filter(call, md, GRPC_ERROR_REF(error));
1530 finish_batch_step(bctl);
1533 static void finish_batch(void* bctlp, grpc_error* error) {
1534 batch_control* bctl = static_cast<batch_control*>(bctlp);
1535 grpc_call* call = bctl->call;
1536 GRPC_CALL_COMBINER_STOP(&call->call_combiner, "on_complete");
1537 if (reinterpret_cast<grpc_error*>(gpr_atm_acq_load(&bctl->batch_error)) ==
1539 gpr_atm_rel_store(&bctl->batch_error,
1540 reinterpret_cast<gpr_atm>(GRPC_ERROR_REF(error)));
1542 if (error != GRPC_ERROR_NONE) {
1543 cancel_with_error(call, GRPC_ERROR_REF(error));
1545 finish_batch_step(bctl);
1548 static void free_no_op_completion(void* p, grpc_cq_completion* completion) {
1549 gpr_free(completion);
1552 static grpc_call_error call_start_batch(grpc_call* call, const grpc_op* ops,
1553 size_t nops, void* notify_tag,
1554 int is_notify_tag_closure) {
1555 GPR_TIMER_SCOPE("call_start_batch", 0);
1559 batch_control* bctl;
1560 bool has_send_ops = false;
1561 int num_recv_ops = 0;
1562 grpc_call_error error = GRPC_CALL_OK;
1563 grpc_transport_stream_op_batch* stream_op;
1564 grpc_transport_stream_op_batch_payload* stream_op_payload;
1566 GRPC_CALL_LOG_BATCH(GPR_INFO, call, ops, nops, notify_tag);
1569 if (!is_notify_tag_closure) {
1570 GPR_ASSERT(grpc_cq_begin_op(call->cq, notify_tag));
1571 grpc_cq_end_op(call->cq, notify_tag, GRPC_ERROR_NONE,
1572 free_no_op_completion, nullptr,
1573 static_cast<grpc_cq_completion*>(
1574 gpr_malloc(sizeof(grpc_cq_completion))));
1576 GRPC_CLOSURE_SCHED((grpc_closure*)notify_tag, GRPC_ERROR_NONE);
1578 error = GRPC_CALL_OK;
1582 bctl = reuse_or_allocate_batch_control(call, ops, nops);
1583 if (bctl == nullptr) {
1584 return GRPC_CALL_ERROR_TOO_MANY_OPERATIONS;
1586 bctl->completion_data.notify_tag.tag = notify_tag;
1587 bctl->completion_data.notify_tag.is_closure =
1588 static_cast<uint8_t>(is_notify_tag_closure != 0);
1590 stream_op = &bctl->op;
1591 stream_op_payload = &call->stream_op_payload;
1593 /* rewrite batch ops into a transport op */
1594 for (i = 0; i < nops; i++) {
1596 if (op->reserved != nullptr) {
1597 error = GRPC_CALL_ERROR;
1598 goto done_with_error;
1601 case GRPC_OP_SEND_INITIAL_METADATA: {
1602 /* Flag validation: currently allow no flags */
1603 if (!are_initial_metadata_flags_valid(op->flags, call->is_client)) {
1604 error = GRPC_CALL_ERROR_INVALID_FLAGS;
1605 goto done_with_error;
1607 if (call->sent_initial_metadata) {
1608 error = GRPC_CALL_ERROR_TOO_MANY_OPERATIONS;
1609 goto done_with_error;
1611 // TODO(juanlishen): If the user has already specified a compression
1612 // algorithm by setting the initial metadata with key of
1613 // GRPC_COMPRESSION_REQUEST_ALGORITHM_MD_KEY, we shouldn't override that
1614 // with the compression algorithm mapped from compression level.
1615 /* process compression level */
1616 grpc_metadata& compression_md = call->compression_md;
1617 compression_md.key = grpc_empty_slice();
1618 compression_md.value = grpc_empty_slice();
1619 compression_md.flags = 0;
1620 size_t additional_metadata_count = 0;
1621 grpc_compression_level effective_compression_level =
1622 GRPC_COMPRESS_LEVEL_NONE;
1623 bool level_set = false;
1624 if (op->data.send_initial_metadata.maybe_compression_level.is_set) {
1625 effective_compression_level =
1626 op->data.send_initial_metadata.maybe_compression_level.level;
1629 const grpc_compression_options copts =
1630 grpc_channel_compression_options(call->channel);
1631 if (copts.default_level.is_set) {
1633 effective_compression_level = copts.default_level.level;
1636 // Currently, only server side supports compression level setting.
1637 if (level_set && !call->is_client) {
1638 const grpc_compression_algorithm calgo =
1639 compression_algorithm_for_level_locked(
1640 call, effective_compression_level);
1641 // The following metadata will be checked and removed by the message
1642 // compression filter. It will be used as the call's compression
1644 compression_md.key = GRPC_MDSTR_GRPC_INTERNAL_ENCODING_REQUEST;
1645 compression_md.value = grpc_compression_algorithm_slice(calgo);
1646 additional_metadata_count++;
1648 if (op->data.send_initial_metadata.count + additional_metadata_count >
1650 error = GRPC_CALL_ERROR_INVALID_METADATA;
1651 goto done_with_error;
1653 stream_op->send_initial_metadata = true;
1654 call->sent_initial_metadata = true;
1655 if (!prepare_application_metadata(
1656 call, static_cast<int>(op->data.send_initial_metadata.count),
1657 op->data.send_initial_metadata.metadata, 0, call->is_client,
1658 &compression_md, static_cast<int>(additional_metadata_count))) {
1659 error = GRPC_CALL_ERROR_INVALID_METADATA;
1660 goto done_with_error;
1662 /* TODO(ctiller): just make these the same variable? */
1663 if (call->is_client) {
1664 call->metadata_batch[0][0].deadline = call->send_deadline;
1666 stream_op_payload->send_initial_metadata.send_initial_metadata =
1667 &call->metadata_batch[0 /* is_receiving */][0 /* is_trailing */];
1668 stream_op_payload->send_initial_metadata.send_initial_metadata_flags =
1670 if (call->is_client) {
1671 stream_op_payload->send_initial_metadata.peer_string =
1674 has_send_ops = true;
1677 case GRPC_OP_SEND_MESSAGE: {
1678 if (!are_write_flags_valid(op->flags)) {
1679 error = GRPC_CALL_ERROR_INVALID_FLAGS;
1680 goto done_with_error;
1682 if (op->data.send_message.send_message == nullptr) {
1683 error = GRPC_CALL_ERROR_INVALID_MESSAGE;
1684 goto done_with_error;
1686 if (call->sending_message) {
1687 error = GRPC_CALL_ERROR_TOO_MANY_OPERATIONS;
1688 goto done_with_error;
1690 uint32_t flags = op->flags;
1691 /* If the outgoing buffer is already compressed, mark it as so in the
1692 flags. These will be picked up by the compression filter and further
1693 (wasteful) attempts at compression skipped. */
1694 if (op->data.send_message.send_message->data.raw.compression >
1695 GRPC_COMPRESS_NONE) {
1696 flags |= GRPC_WRITE_INTERNAL_COMPRESS;
1698 stream_op->send_message = true;
1699 call->sending_message = true;
1700 call->sending_stream.Init(
1701 &op->data.send_message.send_message->data.raw.slice_buffer, flags);
1702 stream_op_payload->send_message.send_message.reset(
1703 call->sending_stream.get());
1704 has_send_ops = true;
1707 case GRPC_OP_SEND_CLOSE_FROM_CLIENT: {
1708 /* Flag validation: currently allow no flags */
1709 if (op->flags != 0) {
1710 error = GRPC_CALL_ERROR_INVALID_FLAGS;
1711 goto done_with_error;
1713 if (!call->is_client) {
1714 error = GRPC_CALL_ERROR_NOT_ON_SERVER;
1715 goto done_with_error;
1717 if (call->sent_final_op) {
1718 error = GRPC_CALL_ERROR_TOO_MANY_OPERATIONS;
1719 goto done_with_error;
1721 stream_op->send_trailing_metadata = true;
1722 call->sent_final_op = true;
1723 stream_op_payload->send_trailing_metadata.send_trailing_metadata =
1724 &call->metadata_batch[0 /* is_receiving */][1 /* is_trailing */];
1725 has_send_ops = true;
1728 case GRPC_OP_SEND_STATUS_FROM_SERVER: {
1729 /* Flag validation: currently allow no flags */
1730 if (op->flags != 0) {
1731 error = GRPC_CALL_ERROR_INVALID_FLAGS;
1732 goto done_with_error;
1734 if (call->is_client) {
1735 error = GRPC_CALL_ERROR_NOT_ON_CLIENT;
1736 goto done_with_error;
1738 if (call->sent_final_op) {
1739 error = GRPC_CALL_ERROR_TOO_MANY_OPERATIONS;
1740 goto done_with_error;
1742 if (op->data.send_status_from_server.trailing_metadata_count >
1744 error = GRPC_CALL_ERROR_INVALID_METADATA;
1745 goto done_with_error;
1747 stream_op->send_trailing_metadata = true;
1748 call->sent_final_op = true;
1749 GPR_ASSERT(call->send_extra_metadata_count == 0);
1750 call->send_extra_metadata_count = 1;
1751 call->send_extra_metadata[0].md = grpc_channel_get_reffed_status_elem(
1752 call->channel, op->data.send_status_from_server.status);
1753 grpc_error* status_error =
1754 op->data.send_status_from_server.status == GRPC_STATUS_OK
1756 : grpc_error_set_int(
1757 GRPC_ERROR_CREATE_FROM_STATIC_STRING(
1758 "Server returned error"),
1759 GRPC_ERROR_INT_GRPC_STATUS,
1760 static_cast<intptr_t>(
1761 op->data.send_status_from_server.status));
1762 if (op->data.send_status_from_server.status_details != nullptr) {
1763 call->send_extra_metadata[1].md = grpc_mdelem_from_slices(
1764 GRPC_MDSTR_GRPC_MESSAGE,
1765 grpc_slice_ref_internal(
1766 *op->data.send_status_from_server.status_details));
1767 call->send_extra_metadata_count++;
1768 if (status_error != GRPC_ERROR_NONE) {
1769 char* msg = grpc_slice_to_c_string(
1770 GRPC_MDVALUE(call->send_extra_metadata[1].md));
1772 grpc_error_set_str(status_error, GRPC_ERROR_STR_GRPC_MESSAGE,
1773 grpc_slice_from_copied_string(msg));
1778 gpr_atm_rel_store(&call->status_error,
1779 reinterpret_cast<gpr_atm>(status_error));
1780 if (!prepare_application_metadata(
1783 op->data.send_status_from_server.trailing_metadata_count),
1784 op->data.send_status_from_server.trailing_metadata, 1, 1,
1786 for (int n = 0; n < call->send_extra_metadata_count; n++) {
1787 GRPC_MDELEM_UNREF(call->send_extra_metadata[n].md);
1789 call->send_extra_metadata_count = 0;
1790 error = GRPC_CALL_ERROR_INVALID_METADATA;
1791 goto done_with_error;
1793 stream_op_payload->send_trailing_metadata.send_trailing_metadata =
1794 &call->metadata_batch[0 /* is_receiving */][1 /* is_trailing */];
1795 has_send_ops = true;
1798 case GRPC_OP_RECV_INITIAL_METADATA: {
1799 /* Flag validation: currently allow no flags */
1800 if (op->flags != 0) {
1801 error = GRPC_CALL_ERROR_INVALID_FLAGS;
1802 goto done_with_error;
1804 if (call->received_initial_metadata) {
1805 error = GRPC_CALL_ERROR_TOO_MANY_OPERATIONS;
1806 goto done_with_error;
1808 call->received_initial_metadata = true;
1809 call->buffered_metadata[0] =
1810 op->data.recv_initial_metadata.recv_initial_metadata;
1811 GRPC_CLOSURE_INIT(&call->receiving_initial_metadata_ready,
1812 receiving_initial_metadata_ready, bctl,
1813 grpc_schedule_on_exec_ctx);
1814 stream_op->recv_initial_metadata = true;
1815 stream_op_payload->recv_initial_metadata.recv_initial_metadata =
1816 &call->metadata_batch[1 /* is_receiving */][0 /* is_trailing */];
1817 stream_op_payload->recv_initial_metadata.recv_initial_metadata_ready =
1818 &call->receiving_initial_metadata_ready;
1819 if (!call->is_client) {
1820 stream_op_payload->recv_initial_metadata.peer_string =
1826 case GRPC_OP_RECV_MESSAGE: {
1827 /* Flag validation: currently allow no flags */
1828 if (op->flags != 0) {
1829 error = GRPC_CALL_ERROR_INVALID_FLAGS;
1830 goto done_with_error;
1832 if (call->receiving_message) {
1833 error = GRPC_CALL_ERROR_TOO_MANY_OPERATIONS;
1834 goto done_with_error;
1836 call->receiving_message = true;
1837 stream_op->recv_message = true;
1838 call->receiving_buffer = op->data.recv_message.recv_message;
1839 stream_op_payload->recv_message.recv_message = &call->receiving_stream;
1840 GRPC_CLOSURE_INIT(&call->receiving_stream_ready,
1841 receiving_stream_ready_in_call_combiner, bctl,
1842 grpc_schedule_on_exec_ctx);
1843 stream_op_payload->recv_message.recv_message_ready =
1844 &call->receiving_stream_ready;
1848 case GRPC_OP_RECV_STATUS_ON_CLIENT: {
1849 /* Flag validation: currently allow no flags */
1850 if (op->flags != 0) {
1851 error = GRPC_CALL_ERROR_INVALID_FLAGS;
1852 goto done_with_error;
1854 if (!call->is_client) {
1855 error = GRPC_CALL_ERROR_NOT_ON_SERVER;
1856 goto done_with_error;
1858 if (call->requested_final_op) {
1859 error = GRPC_CALL_ERROR_TOO_MANY_OPERATIONS;
1860 goto done_with_error;
1862 call->requested_final_op = true;
1863 call->buffered_metadata[1] =
1864 op->data.recv_status_on_client.trailing_metadata;
1865 call->final_op.client.status = op->data.recv_status_on_client.status;
1866 call->final_op.client.status_details =
1867 op->data.recv_status_on_client.status_details;
1868 call->final_op.client.error_string =
1869 op->data.recv_status_on_client.error_string;
1870 stream_op->recv_trailing_metadata = true;
1871 stream_op_payload->recv_trailing_metadata.recv_trailing_metadata =
1872 &call->metadata_batch[1 /* is_receiving */][1 /* is_trailing */];
1873 stream_op_payload->recv_trailing_metadata.collect_stats =
1874 &call->final_info.stats.transport_stream_stats;
1875 GRPC_CLOSURE_INIT(&call->receiving_trailing_metadata_ready,
1876 receiving_trailing_metadata_ready, bctl,
1877 grpc_schedule_on_exec_ctx);
1878 stream_op_payload->recv_trailing_metadata.recv_trailing_metadata_ready =
1879 &call->receiving_trailing_metadata_ready;
1883 case GRPC_OP_RECV_CLOSE_ON_SERVER: {
1884 /* Flag validation: currently allow no flags */
1885 if (op->flags != 0) {
1886 error = GRPC_CALL_ERROR_INVALID_FLAGS;
1887 goto done_with_error;
1889 if (call->is_client) {
1890 error = GRPC_CALL_ERROR_NOT_ON_CLIENT;
1891 goto done_with_error;
1893 if (call->requested_final_op) {
1894 error = GRPC_CALL_ERROR_TOO_MANY_OPERATIONS;
1895 goto done_with_error;
1897 call->requested_final_op = true;
1898 call->final_op.server.cancelled =
1899 op->data.recv_close_on_server.cancelled;
1900 stream_op->recv_trailing_metadata = true;
1901 stream_op_payload->recv_trailing_metadata.recv_trailing_metadata =
1902 &call->metadata_batch[1 /* is_receiving */][1 /* is_trailing */];
1903 stream_op_payload->recv_trailing_metadata.collect_stats =
1904 &call->final_info.stats.transport_stream_stats;
1905 GRPC_CLOSURE_INIT(&call->receiving_trailing_metadata_ready,
1906 receiving_trailing_metadata_ready, bctl,
1907 grpc_schedule_on_exec_ctx);
1908 stream_op_payload->recv_trailing_metadata.recv_trailing_metadata_ready =
1909 &call->receiving_trailing_metadata_ready;
1916 GRPC_CALL_INTERNAL_REF(call, "completion");
1917 if (!is_notify_tag_closure) {
1918 GPR_ASSERT(grpc_cq_begin_op(call->cq, notify_tag));
1920 bctl->set_num_steps_to_complete((has_send_ops ? 1 : 0) + num_recv_ops);
1923 GRPC_CLOSURE_INIT(&bctl->finish_batch, finish_batch, bctl,
1924 grpc_schedule_on_exec_ctx);
1925 stream_op->on_complete = &bctl->finish_batch;
1928 gpr_atm_rel_store(&call->any_ops_sent_atm, 1);
1929 execute_batch(call, stream_op, &bctl->start_batch);
1935 /* reverse any mutations that occurred */
1936 if (stream_op->send_initial_metadata) {
1937 call->sent_initial_metadata = false;
1938 grpc_metadata_batch_clear(&call->metadata_batch[0][0]);
1940 if (stream_op->send_message) {
1941 call->sending_message = false;
1942 call->sending_stream->Orphan();
1944 if (stream_op->send_trailing_metadata) {
1945 call->sent_final_op = false;
1946 grpc_metadata_batch_clear(&call->metadata_batch[0][1]);
1948 if (stream_op->recv_initial_metadata) {
1949 call->received_initial_metadata = false;
1951 if (stream_op->recv_message) {
1952 call->receiving_message = false;
1954 if (stream_op->recv_trailing_metadata) {
1955 call->requested_final_op = false;
1960 grpc_call_error grpc_call_start_batch(grpc_call* call, const grpc_op* ops,
1961 size_t nops, void* tag, void* reserved) {
1962 grpc_call_error err;
1965 "grpc_call_start_batch(call=%p, ops=%p, nops=%lu, tag=%p, "
1967 5, (call, ops, (unsigned long)nops, tag, reserved));
1969 if (reserved != nullptr) {
1970 err = GRPC_CALL_ERROR;
1972 grpc_core::ApplicationCallbackExecCtx callback_exec_ctx;
1973 grpc_core::ExecCtx exec_ctx;
1974 err = call_start_batch(call, ops, nops, tag, 0);
1980 grpc_call_error grpc_call_start_batch_and_execute(grpc_call* call,
1983 grpc_closure* closure) {
1984 return call_start_batch(call, ops, nops, closure, 1);
1987 void grpc_call_context_set(grpc_call* call, grpc_context_index elem,
1988 void* value, void (*destroy)(void* value)) {
1989 if (call->context[elem].destroy) {
1990 call->context[elem].destroy(call->context[elem].value);
1992 call->context[elem].value = value;
1993 call->context[elem].destroy = destroy;
1996 void* grpc_call_context_get(grpc_call* call, grpc_context_index elem) {
1997 return call->context[elem].value;
2000 uint8_t grpc_call_is_client(grpc_call* call) { return call->is_client; }
2002 grpc_compression_algorithm grpc_call_compression_for_level(
2003 grpc_call* call, grpc_compression_level level) {
2004 grpc_compression_algorithm algo =
2005 compression_algorithm_for_level_locked(call, level);
2009 const char* grpc_call_error_to_string(grpc_call_error error) {
2011 case GRPC_CALL_ERROR:
2012 return "GRPC_CALL_ERROR";
2013 case GRPC_CALL_ERROR_ALREADY_ACCEPTED:
2014 return "GRPC_CALL_ERROR_ALREADY_ACCEPTED";
2015 case GRPC_CALL_ERROR_ALREADY_FINISHED:
2016 return "GRPC_CALL_ERROR_ALREADY_FINISHED";
2017 case GRPC_CALL_ERROR_ALREADY_INVOKED:
2018 return "GRPC_CALL_ERROR_ALREADY_INVOKED";
2019 case GRPC_CALL_ERROR_BATCH_TOO_BIG:
2020 return "GRPC_CALL_ERROR_BATCH_TOO_BIG";
2021 case GRPC_CALL_ERROR_INVALID_FLAGS:
2022 return "GRPC_CALL_ERROR_INVALID_FLAGS";
2023 case GRPC_CALL_ERROR_INVALID_MESSAGE:
2024 return "GRPC_CALL_ERROR_INVALID_MESSAGE";
2025 case GRPC_CALL_ERROR_INVALID_METADATA:
2026 return "GRPC_CALL_ERROR_INVALID_METADATA";
2027 case GRPC_CALL_ERROR_NOT_INVOKED:
2028 return "GRPC_CALL_ERROR_NOT_INVOKED";
2029 case GRPC_CALL_ERROR_NOT_ON_CLIENT:
2030 return "GRPC_CALL_ERROR_NOT_ON_CLIENT";
2031 case GRPC_CALL_ERROR_NOT_ON_SERVER:
2032 return "GRPC_CALL_ERROR_NOT_ON_SERVER";
2033 case GRPC_CALL_ERROR_NOT_SERVER_COMPLETION_QUEUE:
2034 return "GRPC_CALL_ERROR_NOT_SERVER_COMPLETION_QUEUE";
2035 case GRPC_CALL_ERROR_PAYLOAD_TYPE_MISMATCH:
2036 return "GRPC_CALL_ERROR_PAYLOAD_TYPE_MISMATCH";
2037 case GRPC_CALL_ERROR_TOO_MANY_OPERATIONS:
2038 return "GRPC_CALL_ERROR_TOO_MANY_OPERATIONS";
2039 case GRPC_CALL_ERROR_COMPLETION_QUEUE_SHUTDOWN:
2040 return "GRPC_CALL_ERROR_COMPLETION_QUEUE_SHUTDOWN";
2042 return "GRPC_CALL_OK";
2044 GPR_UNREACHABLE_CODE(return "GRPC_CALL_ERROR_UNKNOW");