3 * Copyright 2015 gRPC authors.
5 * Licensed under the Apache License, Version 2.0 (the "License");
6 * you may not use this file except in compliance with the License.
7 * You may obtain a copy of the License at
9 * http://www.apache.org/licenses/LICENSE-2.0
11 * Unless required by applicable law or agreed to in writing, software
12 * distributed under the License is distributed on an "AS IS" BASIS,
13 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14 * See the License for the specific language governing permissions and
15 * limitations under the License.
19 #include <grpc/support/port_platform.h>
24 #include <grpc/compression.h>
25 #include <grpc/slice_buffer.h>
26 #include <grpc/support/alloc.h>
27 #include <grpc/support/log.h>
29 #include "src/core/ext/filters/http/message_compress/message_compress_filter.h"
30 #include "src/core/lib/channel/channel_args.h"
31 #include "src/core/lib/compression/algorithm_metadata.h"
32 #include "src/core/lib/compression/compression_args.h"
33 #include "src/core/lib/compression/compression_internal.h"
34 #include "src/core/lib/compression/message_compress.h"
35 #include "src/core/lib/gpr/string.h"
36 #include "src/core/lib/gprpp/manual_constructor.h"
37 #include "src/core/lib/profiling/timers.h"
38 #include "src/core/lib/slice/slice_internal.h"
39 #include "src/core/lib/slice/slice_string_helpers.h"
40 #include "src/core/lib/surface/call.h"
41 #include "src/core/lib/transport/static_metadata.h"
43 static void start_send_message_batch(void* arg, grpc_error* unused);
44 static void send_message_on_complete(void* arg, grpc_error* error);
45 static void on_send_message_next_done(void* arg, grpc_error* error);
50 /** The default, channel-level, compression algorithm */
51 grpc_compression_algorithm default_compression_algorithm;
52 /** Bitset of enabled compression algorithms */
53 uint32_t enabled_compression_algorithms_bitset;
54 /** Bitset of enabled message compression algorithms */
55 uint32_t enabled_message_compression_algorithms_bitset;
56 /** Bitset of enabled stream compression algorithms */
57 uint32_t enabled_stream_compression_algorithms_bitset;
61 call_data(grpc_call_element* elem, const grpc_call_element_args& args)
62 : call_combiner(args.call_combiner) {
63 channel_data* channeld = static_cast<channel_data*>(elem->channel_data);
64 // The call's message compression algorithm is set to channel's default
65 // setting. It can be overridden later by initial metadata.
66 if (GPR_LIKELY(GPR_BITGET(channeld->enabled_compression_algorithms_bitset,
67 channeld->default_compression_algorithm))) {
68 message_compression_algorithm =
69 grpc_compression_algorithm_to_message_compression_algorithm(
70 channeld->default_compression_algorithm);
72 GRPC_CLOSURE_INIT(&start_send_message_batch_in_call_combiner,
73 start_send_message_batch, elem,
74 grpc_schedule_on_exec_ctx);
78 if (state_initialized) {
79 grpc_slice_buffer_destroy_internal(&slices);
81 GRPC_ERROR_UNREF(cancel_error);
84 grpc_core::CallCombiner* call_combiner;
85 grpc_message_compression_algorithm message_compression_algorithm =
86 GRPC_MESSAGE_COMPRESS_NONE;
87 grpc_error* cancel_error = GRPC_ERROR_NONE;
88 grpc_transport_stream_op_batch* send_message_batch = nullptr;
89 bool seen_initial_metadata = false;
90 /* Set to true, if the fields below are initialized. */
91 bool state_initialized = false;
92 grpc_closure start_send_message_batch_in_call_combiner;
93 /* The fields below are only initialized when we compress the payload.
94 * Keep them at the bottom of the struct, so they don't pollute the
96 grpc_linked_mdelem message_compression_algorithm_storage;
97 grpc_linked_mdelem stream_compression_algorithm_storage;
98 grpc_linked_mdelem accept_encoding_storage;
99 grpc_linked_mdelem accept_stream_encoding_storage;
100 grpc_slice_buffer slices; /**< Buffers up input slices to be compressed */
101 grpc_core::ManualConstructor<grpc_core::SliceBufferByteStream>
103 grpc_closure* original_send_message_on_complete;
104 grpc_closure send_message_on_complete;
105 grpc_closure on_send_message_next_done;
110 // Returns true if we should skip message compression for the current message.
111 static bool skip_message_compression(grpc_call_element* elem) {
112 call_data* calld = static_cast<call_data*>(elem->call_data);
113 // If the flags of this message indicate that it shouldn't be compressed, we
114 // skip message compression.
116 calld->send_message_batch->payload->send_message.send_message->flags();
117 if (flags & (GRPC_WRITE_NO_COMPRESS | GRPC_WRITE_INTERNAL_COMPRESS)) {
120 // If this call doesn't have any message compression algorithm set, skip
121 // message compression.
122 return calld->message_compression_algorithm == GRPC_MESSAGE_COMPRESS_NONE;
125 // Determines the compression algorithm from the initial metadata and the
126 // channel's default setting.
127 static grpc_compression_algorithm find_compression_algorithm(
128 grpc_metadata_batch* initial_metadata, channel_data* channeld) {
129 if (initial_metadata->idx.named.grpc_internal_encoding_request == nullptr) {
130 return channeld->default_compression_algorithm;
132 grpc_compression_algorithm compression_algorithm;
133 // Parse the compression algorithm from the initial metadata.
135 initial_metadata->idx.named.grpc_internal_encoding_request->md;
136 GPR_ASSERT(grpc_compression_algorithm_parse(GRPC_MDVALUE(md),
137 &compression_algorithm));
138 // Remove this metadata since it's an internal one (i.e., it won't be
140 grpc_metadata_batch_remove(initial_metadata,
141 GRPC_BATCH_GRPC_INTERNAL_ENCODING_REQUEST);
142 // Check if that algorithm is enabled. Note that GRPC_COMPRESS_NONE is always
144 // TODO(juanlishen): Maybe use channel default or abort() if the algorithm
145 // from the initial metadata is disabled.
146 if (GPR_LIKELY(GPR_BITGET(channeld->enabled_compression_algorithms_bitset,
147 compression_algorithm))) {
148 return compression_algorithm;
150 const char* algorithm_name;
152 grpc_compression_algorithm_name(compression_algorithm, &algorithm_name));
154 "Invalid compression algorithm from initial metadata: '%s' "
155 "(previously disabled). "
156 "Will not compress.",
158 return GRPC_COMPRESS_NONE;
161 static void initialize_state(grpc_call_element* elem, call_data* calld) {
162 GPR_DEBUG_ASSERT(!calld->state_initialized);
163 calld->state_initialized = true;
164 grpc_slice_buffer_init(&calld->slices);
165 GRPC_CLOSURE_INIT(&calld->send_message_on_complete,
166 ::send_message_on_complete, elem,
167 grpc_schedule_on_exec_ctx);
168 GRPC_CLOSURE_INIT(&calld->on_send_message_next_done,
169 ::on_send_message_next_done, elem,
170 grpc_schedule_on_exec_ctx);
173 static grpc_error* process_send_initial_metadata(
174 grpc_call_element* elem,
175 grpc_metadata_batch* initial_metadata) GRPC_MUST_USE_RESULT;
176 static grpc_error* process_send_initial_metadata(
177 grpc_call_element* elem, grpc_metadata_batch* initial_metadata) {
178 call_data* calld = static_cast<call_data*>(elem->call_data);
179 channel_data* channeld = static_cast<channel_data*>(elem->channel_data);
180 // Find the compression algorithm.
181 grpc_compression_algorithm compression_algorithm =
182 find_compression_algorithm(initial_metadata, channeld);
183 // Note that at most one of the following algorithms can be set.
184 calld->message_compression_algorithm =
185 grpc_compression_algorithm_to_message_compression_algorithm(
186 compression_algorithm);
187 grpc_stream_compression_algorithm stream_compression_algorithm =
188 grpc_compression_algorithm_to_stream_compression_algorithm(
189 compression_algorithm);
190 // Hint compression algorithm.
191 grpc_error* error = GRPC_ERROR_NONE;
192 if (calld->message_compression_algorithm != GRPC_MESSAGE_COMPRESS_NONE) {
193 initialize_state(elem, calld);
194 error = grpc_metadata_batch_add_tail(
195 initial_metadata, &calld->message_compression_algorithm_storage,
196 grpc_message_compression_encoding_mdelem(
197 calld->message_compression_algorithm),
198 GRPC_BATCH_GRPC_ENCODING);
199 } else if (stream_compression_algorithm != GRPC_STREAM_COMPRESS_NONE) {
200 initialize_state(elem, calld);
201 error = grpc_metadata_batch_add_tail(
202 initial_metadata, &calld->stream_compression_algorithm_storage,
203 grpc_stream_compression_encoding_mdelem(stream_compression_algorithm),
204 GRPC_BATCH_CONTENT_ENCODING);
206 if (error != GRPC_ERROR_NONE) return error;
207 // Convey supported compression algorithms.
208 error = grpc_metadata_batch_add_tail(
209 initial_metadata, &calld->accept_encoding_storage,
210 GRPC_MDELEM_ACCEPT_ENCODING_FOR_ALGORITHMS(
211 channeld->enabled_message_compression_algorithms_bitset),
212 GRPC_BATCH_GRPC_ACCEPT_ENCODING);
213 if (error != GRPC_ERROR_NONE) return error;
214 // Do not overwrite accept-encoding header if it already presents (e.g., added
216 if (!initial_metadata->idx.named.accept_encoding) {
217 error = grpc_metadata_batch_add_tail(
218 initial_metadata, &calld->accept_stream_encoding_storage,
219 GRPC_MDELEM_ACCEPT_STREAM_ENCODING_FOR_ALGORITHMS(
220 channeld->enabled_stream_compression_algorithms_bitset),
221 GRPC_BATCH_ACCEPT_ENCODING);
226 static void send_message_on_complete(void* arg, grpc_error* error) {
227 grpc_call_element* elem = static_cast<grpc_call_element*>(arg);
228 call_data* calld = static_cast<call_data*>(elem->call_data);
229 grpc_slice_buffer_reset_and_unref_internal(&calld->slices);
230 GRPC_CLOSURE_RUN(calld->original_send_message_on_complete,
231 GRPC_ERROR_REF(error));
234 static void send_message_batch_continue(grpc_call_element* elem) {
235 call_data* calld = static_cast<call_data*>(elem->call_data);
236 // Note: The call to grpc_call_next_op() results in yielding the
237 // call combiner, so we need to clear calld->send_message_batch
238 // before we do that.
239 grpc_transport_stream_op_batch* send_message_batch =
240 calld->send_message_batch;
241 calld->send_message_batch = nullptr;
242 grpc_call_next_op(elem, send_message_batch);
245 static void finish_send_message(grpc_call_element* elem) {
246 call_data* calld = static_cast<call_data*>(elem->call_data);
247 GPR_DEBUG_ASSERT(calld->message_compression_algorithm !=
248 GRPC_MESSAGE_COMPRESS_NONE);
249 // Compress the data if appropriate.
250 grpc_slice_buffer tmp;
251 grpc_slice_buffer_init(&tmp);
252 uint32_t send_flags =
253 calld->send_message_batch->payload->send_message.send_message->flags();
254 bool did_compress = grpc_msg_compress(calld->message_compression_algorithm,
255 &calld->slices, &tmp);
257 if (GRPC_TRACE_FLAG_ENABLED(grpc_compression_trace)) {
258 const char* algo_name;
259 const size_t before_size = calld->slices.length;
260 const size_t after_size = tmp.length;
261 const float savings_ratio = 1.0f - static_cast<float>(after_size) /
262 static_cast<float>(before_size);
263 GPR_ASSERT(grpc_message_compression_algorithm_name(
264 calld->message_compression_algorithm, &algo_name));
266 "Compressed[%s] %" PRIuPTR " bytes vs. %" PRIuPTR
267 " bytes (%.2f%% savings)",
268 algo_name, before_size, after_size, 100 * savings_ratio);
270 grpc_slice_buffer_swap(&calld->slices, &tmp);
271 send_flags |= GRPC_WRITE_INTERNAL_COMPRESS;
273 if (GRPC_TRACE_FLAG_ENABLED(grpc_compression_trace)) {
274 const char* algo_name;
275 GPR_ASSERT(grpc_message_compression_algorithm_name(
276 calld->message_compression_algorithm, &algo_name));
278 "Algorithm '%s' enabled but decided not to compress. Input size: "
280 algo_name, calld->slices.length);
283 grpc_slice_buffer_destroy_internal(&tmp);
284 // Swap out the original byte stream with our new one and send the
286 calld->replacement_stream.Init(&calld->slices, send_flags);
287 calld->send_message_batch->payload->send_message.send_message.reset(
288 calld->replacement_stream.get());
289 calld->original_send_message_on_complete =
290 calld->send_message_batch->on_complete;
291 calld->send_message_batch->on_complete = &calld->send_message_on_complete;
292 send_message_batch_continue(elem);
295 static void fail_send_message_batch_in_call_combiner(void* arg,
297 call_data* calld = static_cast<call_data*>(arg);
298 if (calld->send_message_batch != nullptr) {
299 grpc_transport_stream_op_batch_finish_with_failure(
300 calld->send_message_batch, GRPC_ERROR_REF(error), calld->call_combiner);
301 calld->send_message_batch = nullptr;
305 // Pulls a slice from the send_message byte stream and adds it to calld->slices.
306 static grpc_error* pull_slice_from_send_message(call_data* calld) {
307 grpc_slice incoming_slice;
309 calld->send_message_batch->payload->send_message.send_message->Pull(
311 if (error == GRPC_ERROR_NONE) {
312 grpc_slice_buffer_add(&calld->slices, incoming_slice);
317 // Reads as many slices as possible from the send_message byte stream.
318 // If all data has been read, invokes finish_send_message(). Otherwise,
319 // an async call to ByteStream::Next() has been started, which will
320 // eventually result in calling on_send_message_next_done().
321 static void continue_reading_send_message(grpc_call_element* elem) {
322 call_data* calld = static_cast<call_data*>(elem->call_data);
323 while (calld->send_message_batch->payload->send_message.send_message->Next(
324 ~static_cast<size_t>(0), &calld->on_send_message_next_done)) {
325 grpc_error* error = pull_slice_from_send_message(calld);
326 if (error != GRPC_ERROR_NONE) {
327 // Closure callback; does not take ownership of error.
328 fail_send_message_batch_in_call_combiner(calld, error);
329 GRPC_ERROR_UNREF(error);
332 if (calld->slices.length == calld->send_message_batch->payload->send_message
333 .send_message->length()) {
334 finish_send_message(elem);
340 // Async callback for ByteStream::Next().
341 static void on_send_message_next_done(void* arg, grpc_error* error) {
342 grpc_call_element* elem = static_cast<grpc_call_element*>(arg);
343 call_data* calld = static_cast<call_data*>(elem->call_data);
344 if (error != GRPC_ERROR_NONE) {
345 // Closure callback; does not take ownership of error.
346 fail_send_message_batch_in_call_combiner(calld, error);
349 error = pull_slice_from_send_message(calld);
350 if (error != GRPC_ERROR_NONE) {
351 // Closure callback; does not take ownership of error.
352 fail_send_message_batch_in_call_combiner(calld, error);
353 GRPC_ERROR_UNREF(error);
356 if (calld->slices.length ==
357 calld->send_message_batch->payload->send_message.send_message->length()) {
358 finish_send_message(elem);
360 continue_reading_send_message(elem);
364 static void start_send_message_batch(void* arg, grpc_error* unused) {
365 grpc_call_element* elem = static_cast<grpc_call_element*>(arg);
366 if (skip_message_compression(elem)) {
367 send_message_batch_continue(elem);
369 continue_reading_send_message(elem);
373 static void compress_start_transport_stream_op_batch(
374 grpc_call_element* elem, grpc_transport_stream_op_batch* batch) {
375 GPR_TIMER_SCOPE("compress_start_transport_stream_op_batch", 0);
376 call_data* calld = static_cast<call_data*>(elem->call_data);
377 // Handle cancel_stream.
378 if (batch->cancel_stream) {
379 GRPC_ERROR_UNREF(calld->cancel_error);
380 calld->cancel_error =
381 GRPC_ERROR_REF(batch->payload->cancel_stream.cancel_error);
382 if (calld->send_message_batch != nullptr) {
383 if (!calld->seen_initial_metadata) {
384 GRPC_CALL_COMBINER_START(
385 calld->call_combiner,
386 GRPC_CLOSURE_CREATE(fail_send_message_batch_in_call_combiner, calld,
387 grpc_schedule_on_exec_ctx),
388 GRPC_ERROR_REF(calld->cancel_error), "failing send_message op");
390 calld->send_message_batch->payload->send_message.send_message->Shutdown(
391 GRPC_ERROR_REF(calld->cancel_error));
394 } else if (calld->cancel_error != GRPC_ERROR_NONE) {
395 grpc_transport_stream_op_batch_finish_with_failure(
396 batch, GRPC_ERROR_REF(calld->cancel_error), calld->call_combiner);
399 // Handle send_initial_metadata.
400 if (batch->send_initial_metadata) {
401 GPR_ASSERT(!calld->seen_initial_metadata);
402 grpc_error* error = process_send_initial_metadata(
403 elem, batch->payload->send_initial_metadata.send_initial_metadata);
404 if (error != GRPC_ERROR_NONE) {
405 grpc_transport_stream_op_batch_finish_with_failure(batch, error,
406 calld->call_combiner);
409 calld->seen_initial_metadata = true;
410 // If we had previously received a batch containing a send_message op,
411 // handle it now. Note that we need to re-enter the call combiner
412 // for this, since we can't send two batches down while holding the
413 // call combiner, since the connected_channel filter (at the bottom of
414 // the call stack) will release the call combiner for each batch it sees.
415 if (calld->send_message_batch != nullptr) {
416 GRPC_CALL_COMBINER_START(
417 calld->call_combiner,
418 &calld->start_send_message_batch_in_call_combiner, GRPC_ERROR_NONE,
419 "starting send_message after send_initial_metadata");
422 // Handle send_message.
423 if (batch->send_message) {
424 GPR_ASSERT(calld->send_message_batch == nullptr);
425 calld->send_message_batch = batch;
426 // If we have not yet seen send_initial_metadata, then we have to
427 // wait. We save the batch in calld and then drop the call
428 // combiner, which we'll have to pick up again later when we get
429 // send_initial_metadata.
430 if (!calld->seen_initial_metadata) {
431 GRPC_CALL_COMBINER_STOP(
432 calld->call_combiner,
433 "send_message batch pending send_initial_metadata");
436 start_send_message_batch(elem, GRPC_ERROR_NONE);
438 // Pass control down the stack.
439 grpc_call_next_op(elem, batch);
443 /* Constructor for call_data */
444 static grpc_error* init_call_elem(grpc_call_element* elem,
445 const grpc_call_element_args* args) {
446 new (elem->call_data) call_data(elem, *args);
447 return GRPC_ERROR_NONE;
450 /* Destructor for call_data */
451 static void destroy_call_elem(grpc_call_element* elem,
452 const grpc_call_final_info* final_info,
453 grpc_closure* ignored) {
454 call_data* calld = static_cast<call_data*>(elem->call_data);
458 /* Constructor for channel_data */
459 static grpc_error* init_channel_elem(grpc_channel_element* elem,
460 grpc_channel_element_args* args) {
461 channel_data* channeld = static_cast<channel_data*>(elem->channel_data);
462 // Get the enabled and the default algorithms from channel args.
463 channeld->enabled_compression_algorithms_bitset =
464 grpc_channel_args_compression_algorithm_get_states(args->channel_args);
465 channeld->default_compression_algorithm =
466 grpc_channel_args_get_channel_default_compression_algorithm(
468 // Make sure the default is enabled.
469 if (!GPR_BITGET(channeld->enabled_compression_algorithms_bitset,
470 channeld->default_compression_algorithm)) {
472 GPR_ASSERT(grpc_compression_algorithm_name(
473 channeld->default_compression_algorithm, &name) == 1);
475 "default compression algorithm %s not enabled: switching to none",
477 channeld->default_compression_algorithm = GRPC_COMPRESS_NONE;
479 channeld->enabled_message_compression_algorithms_bitset =
480 grpc_compression_bitset_to_message_bitset(
481 channeld->enabled_compression_algorithms_bitset);
482 channeld->enabled_stream_compression_algorithms_bitset =
483 grpc_compression_bitset_to_stream_bitset(
484 channeld->enabled_compression_algorithms_bitset);
485 GPR_ASSERT(!args->is_last);
486 return GRPC_ERROR_NONE;
489 /* Destructor for channel data */
490 static void destroy_channel_elem(grpc_channel_element* elem) {}
492 const grpc_channel_filter grpc_message_compress_filter = {
493 compress_start_transport_stream_op_batch,
494 grpc_channel_next_op,
497 grpc_call_stack_ignore_set_pollset_or_pollset_set,
499 sizeof(channel_data),
501 destroy_channel_elem,
502 grpc_channel_next_get_info,