2 // Copyright 2017 gRPC authors.
4 // Licensed under the Apache License, Version 2.0 (the "License");
5 // you may not use this file except in compliance with the License.
6 // You may obtain a copy of the License at
8 // http://www.apache.org/licenses/LICENSE-2.0
10 // Unless required by applicable law or agreed to in writing, software
11 // distributed under the License is distributed on an "AS IS" BASIS,
12 // WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13 // See the License for the specific language governing permissions and
14 // limitations under the License.
17 #include <grpc/support/port_platform.h>
19 #include "src/core/ext/filters/workarounds/workaround_cronet_compression_filter.h"
23 #include <grpc/support/alloc.h>
25 #include "src/core/ext/filters/workarounds/workaround_utils.h"
26 #include "src/core/lib/channel/channel_stack_builder.h"
27 #include "src/core/lib/surface/channel_init.h"
28 #include "src/core/lib/transport/metadata.h"
32 // Receive closures are chained: we inject this closure as the
33 // recv_initial_metadata_ready up-call on transport_stream_op, and remember to
34 // call our next_recv_initial_metadata_ready member after handling it.
35 grpc_closure recv_initial_metadata_ready;
36 // Used by recv_initial_metadata_ready.
37 grpc_metadata_batch* recv_initial_metadata;
38 // Original recv_initial_metadata_ready callback, invoked after our own.
39 grpc_closure* next_recv_initial_metadata_ready;
41 // Marks whether the workaround is active
42 bool workaround_active;
46 // Find the user agent metadata element in the batch
47 static bool get_user_agent_mdelem(const grpc_metadata_batch* batch,
49 if (batch->idx.named.user_agent != nullptr) {
50 *md = batch->idx.named.user_agent->md;
56 // Callback invoked when we receive an initial metadata.
57 static void recv_initial_metadata_ready(void* user_data, grpc_error* error) {
58 grpc_call_element* elem = static_cast<grpc_call_element*>(user_data);
59 call_data* calld = static_cast<call_data*>(elem->call_data);
61 if (GRPC_ERROR_NONE == error) {
63 if (get_user_agent_mdelem(calld->recv_initial_metadata, &md)) {
64 grpc_workaround_user_agent_md* user_agent_md = grpc_parse_user_agent(md);
66 ->workaround_active[GRPC_WORKAROUND_ID_CRONET_COMPRESSION]) {
67 calld->workaround_active = true;
72 // Invoke the next callback.
73 GRPC_CLOSURE_RUN(calld->next_recv_initial_metadata_ready,
74 GRPC_ERROR_REF(error));
77 // Start transport stream op.
78 static void start_transport_stream_op_batch(
79 grpc_call_element* elem, grpc_transport_stream_op_batch* op) {
80 call_data* calld = static_cast<call_data*>(elem->call_data);
82 // Inject callback for receiving initial metadata
83 if (op->recv_initial_metadata) {
84 calld->next_recv_initial_metadata_ready =
85 op->payload->recv_initial_metadata.recv_initial_metadata_ready;
86 op->payload->recv_initial_metadata.recv_initial_metadata_ready =
87 &calld->recv_initial_metadata_ready;
88 calld->recv_initial_metadata =
89 op->payload->recv_initial_metadata.recv_initial_metadata;
92 if (op->send_message) {
93 /* Send message happens after client's user-agent (initial metadata) is
94 * received, so workaround_active must be set already */
95 if (calld->workaround_active) {
96 op->payload->send_message.send_message->set_flags(
97 op->payload->send_message.send_message->flags() |
98 GRPC_WRITE_NO_COMPRESS);
102 // Chain to the next filter.
103 grpc_call_next_op(elem, op);
106 // Constructor for call_data.
107 static grpc_error* init_call_elem(grpc_call_element* elem,
108 const grpc_call_element_args* args) {
109 call_data* calld = static_cast<call_data*>(elem->call_data);
110 calld->next_recv_initial_metadata_ready = nullptr;
111 calld->workaround_active = false;
112 GRPC_CLOSURE_INIT(&calld->recv_initial_metadata_ready,
113 recv_initial_metadata_ready, elem,
114 grpc_schedule_on_exec_ctx);
115 return GRPC_ERROR_NONE;
118 // Destructor for call_data.
119 static void destroy_call_elem(grpc_call_element* elem,
120 const grpc_call_final_info* final_info,
121 grpc_closure* ignored) {}
123 // Constructor for channel_data.
124 static grpc_error* init_channel_elem(grpc_channel_element* elem,
125 grpc_channel_element_args* args) {
126 return GRPC_ERROR_NONE;
129 // Destructor for channel_data.
130 static void destroy_channel_elem(grpc_channel_element* elem) {}
132 // Parse the user agent
133 static bool parse_user_agent(grpc_mdelem md) {
134 const char grpc_objc_specifier[] = "grpc-objc/";
135 const size_t grpc_objc_specifier_len = sizeof(grpc_objc_specifier) - 1;
136 const char cronet_specifier[] = "cronet_http";
137 const size_t cronet_specifier_len = sizeof(cronet_specifier) - 1;
139 char* user_agent_str = grpc_slice_to_c_string(GRPC_MDVALUE(md));
140 bool grpc_objc_specifier_seen = false;
141 bool cronet_specifier_seen = false;
142 char *major_version_str = user_agent_str, *minor_version_str;
143 long major_version = 0, minor_version = 0;
145 char* head = strtok(user_agent_str, " ");
146 while (head != nullptr) {
147 if (!grpc_objc_specifier_seen &&
148 0 == strncmp(head, grpc_objc_specifier, grpc_objc_specifier_len)) {
149 major_version_str = head + grpc_objc_specifier_len;
150 grpc_objc_specifier_seen = true;
151 } else if (grpc_objc_specifier_seen &&
152 0 == strncmp(head, cronet_specifier, cronet_specifier_len)) {
153 cronet_specifier_seen = true;
157 head = strtok(nullptr, " ");
159 if (grpc_objc_specifier_seen) {
160 major_version_str = strtok(major_version_str, ".");
161 minor_version_str = strtok(nullptr, ".");
162 major_version = atol(major_version_str);
163 minor_version = atol(minor_version_str);
166 gpr_free(user_agent_str);
167 return (grpc_objc_specifier_seen && cronet_specifier_seen &&
168 (major_version < 1 || (major_version == 1 && minor_version <= 3)));
171 const grpc_channel_filter grpc_workaround_cronet_compression_filter = {
172 start_transport_stream_op_batch,
173 grpc_channel_next_op,
176 grpc_call_stack_ignore_set_pollset_or_pollset_set,
180 destroy_channel_elem,
181 grpc_channel_next_get_info,
182 "workaround_cronet_compression"};
184 static bool register_workaround_cronet_compression(
185 grpc_channel_stack_builder* builder, void* arg) {
186 const grpc_channel_args* channel_args =
187 grpc_channel_stack_builder_get_channel_arguments(builder);
188 const grpc_arg* a = grpc_channel_args_find(
189 channel_args, GRPC_ARG_WORKAROUND_CRONET_COMPRESSION);
193 if (grpc_channel_arg_get_bool(a, false) == false) {
196 return grpc_channel_stack_builder_prepend_filter(
197 builder, &grpc_workaround_cronet_compression_filter, nullptr, nullptr);
200 void grpc_workaround_cronet_compression_filter_init(void) {
201 grpc_channel_init_register_stage(
202 GRPC_SERVER_CHANNEL, GRPC_WORKAROUND_PRIORITY_HIGH,
203 register_workaround_cronet_compression, nullptr);
204 grpc_register_workaround(GRPC_WORKAROUND_ID_CRONET_COMPRESSION,
208 void grpc_workaround_cronet_compression_filter_shutdown(void) {}