Built motion from commit 6a09e18b.|2.6.11
[motion2.git] / legacy-libs / grpc-cloned / deps / grpc / src / core / lib / iomgr / cfstream_handle.cc
1 /*
2  *
3  * Copyright 2018 gRPC authors.
4  *
5  * Licensed under the Apache License, Version 2.0 (the "License");
6  * you may not use this file except in compliance with the License.
7  * You may obtain a copy of the License at
8  *
9  *     http://www.apache.org/licenses/LICENSE-2.0
10  *
11  * Unless required by applicable law or agreed to in writing, software
12  * distributed under the License is distributed on an "AS IS" BASIS,
13  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14  * See the License for the specific language governing permissions and
15  * limitations under the License.
16  *
17  */
18
19 #include <grpc/support/port_platform.h>
20
21 #include "src/core/lib/gprpp/memory.h"
22 #include "src/core/lib/iomgr/port.h"
23
24 #ifdef GRPC_CFSTREAM
25 #import <CoreFoundation/CoreFoundation.h>
26 #import "src/core/lib/iomgr/cfstream_handle.h"
27
28 #include <grpc/grpc.h>
29 #include <grpc/support/atm.h>
30 #include <grpc/support/sync.h>
31
32 #include "src/core/lib/debug/trace.h"
33 #include "src/core/lib/iomgr/closure.h"
34 #include "src/core/lib/iomgr/error_cfstream.h"
35 #include "src/core/lib/iomgr/exec_ctx.h"
36
37 extern grpc_core::TraceFlag grpc_tcp_trace;
38
39 GrpcLibraryInitHolder::GrpcLibraryInitHolder() { grpc_init(); }
40
41 GrpcLibraryInitHolder::~GrpcLibraryInitHolder() { grpc_shutdown(); }
42
43 void* CFStreamHandle::Retain(void* info) {
44   CFStreamHandle* handle = static_cast<CFStreamHandle*>(info);
45   CFSTREAM_HANDLE_REF(handle, "retain");
46   return info;
47 }
48
49 void CFStreamHandle::Release(void* info) {
50   CFStreamHandle* handle = static_cast<CFStreamHandle*>(info);
51   CFSTREAM_HANDLE_UNREF(handle, "release");
52 }
53
54 CFStreamHandle* CFStreamHandle::CreateStreamHandle(
55     CFReadStreamRef read_stream, CFWriteStreamRef write_stream) {
56   return grpc_core::New<CFStreamHandle>(read_stream, write_stream);
57 }
58
59 void CFStreamHandle::ReadCallback(CFReadStreamRef stream,
60                                   CFStreamEventType type,
61                                   void* client_callback_info) {
62   grpc_core::ApplicationCallbackExecCtx callback_exec_ctx;
63   grpc_core::ExecCtx exec_ctx;
64   grpc_error* error;
65   CFErrorRef stream_error;
66   CFStreamHandle* handle = static_cast<CFStreamHandle*>(client_callback_info);
67   if (grpc_tcp_trace.enabled()) {
68     gpr_log(GPR_DEBUG, "CFStream ReadCallback (%p, %p, %lu, %p)", handle,
69             stream, type, client_callback_info);
70   }
71   switch (type) {
72     case kCFStreamEventOpenCompleted:
73       handle->open_event_.SetReady();
74       break;
75     case kCFStreamEventHasBytesAvailable:
76     case kCFStreamEventEndEncountered:
77       handle->read_event_.SetReady();
78       break;
79     case kCFStreamEventErrorOccurred:
80       stream_error = CFReadStreamCopyError(stream);
81       error = grpc_error_set_int(
82           GRPC_ERROR_CREATE_FROM_CFERROR(stream_error, "read error"),
83           GRPC_ERROR_INT_GRPC_STATUS, GRPC_STATUS_UNAVAILABLE);
84       CFRelease(stream_error);
85       handle->open_event_.SetShutdown(GRPC_ERROR_REF(error));
86       handle->write_event_.SetShutdown(GRPC_ERROR_REF(error));
87       handle->read_event_.SetShutdown(GRPC_ERROR_REF(error));
88       GRPC_ERROR_UNREF(error);
89       break;
90     default:
91       GPR_UNREACHABLE_CODE(return );
92   }
93 }
94 void CFStreamHandle::WriteCallback(CFWriteStreamRef stream,
95                                    CFStreamEventType type,
96                                    void* clientCallBackInfo) {
97   grpc_core::ApplicationCallbackExecCtx callback_exec_ctx;
98   grpc_core::ExecCtx exec_ctx;
99   grpc_error* error;
100   CFErrorRef stream_error;
101   CFStreamHandle* handle = static_cast<CFStreamHandle*>(clientCallBackInfo);
102   if (grpc_tcp_trace.enabled()) {
103     gpr_log(GPR_DEBUG, "CFStream WriteCallback (%p, %p, %lu, %p)", handle,
104             stream, type, clientCallBackInfo);
105   }
106   switch (type) {
107     case kCFStreamEventOpenCompleted:
108       handle->open_event_.SetReady();
109       break;
110     case kCFStreamEventCanAcceptBytes:
111     case kCFStreamEventEndEncountered:
112       handle->write_event_.SetReady();
113       break;
114     case kCFStreamEventErrorOccurred:
115       stream_error = CFWriteStreamCopyError(stream);
116       error = grpc_error_set_int(
117           GRPC_ERROR_CREATE_FROM_CFERROR(stream_error, "write error"),
118           GRPC_ERROR_INT_GRPC_STATUS, GRPC_STATUS_UNAVAILABLE);
119       CFRelease(stream_error);
120       handle->open_event_.SetShutdown(GRPC_ERROR_REF(error));
121       handle->write_event_.SetShutdown(GRPC_ERROR_REF(error));
122       handle->read_event_.SetShutdown(GRPC_ERROR_REF(error));
123       GRPC_ERROR_UNREF(error);
124       break;
125     default:
126       GPR_UNREACHABLE_CODE(return );
127   }
128 }
129
130 CFStreamHandle::CFStreamHandle(CFReadStreamRef read_stream,
131                                CFWriteStreamRef write_stream) {
132   gpr_ref_init(&refcount_, 1);
133   open_event_.InitEvent();
134   read_event_.InitEvent();
135   write_event_.InitEvent();
136   dispatch_queue_ = dispatch_queue_create(nullptr, DISPATCH_QUEUE_SERIAL);
137   CFStreamClientContext ctx = {0, static_cast<void*>(this),
138                                CFStreamHandle::Retain, CFStreamHandle::Release,
139                                nil};
140   CFReadStreamSetClient(
141       read_stream,
142       kCFStreamEventOpenCompleted | kCFStreamEventHasBytesAvailable |
143           kCFStreamEventErrorOccurred | kCFStreamEventEndEncountered,
144       CFStreamHandle::ReadCallback, &ctx);
145   CFWriteStreamSetClient(
146       write_stream,
147       kCFStreamEventOpenCompleted | kCFStreamEventCanAcceptBytes |
148           kCFStreamEventErrorOccurred | kCFStreamEventEndEncountered,
149       CFStreamHandle::WriteCallback, &ctx);
150   CFReadStreamSetDispatchQueue(read_stream, dispatch_queue_);
151   CFWriteStreamSetDispatchQueue(write_stream, dispatch_queue_);
152 }
153
154 CFStreamHandle::~CFStreamHandle() {
155   open_event_.DestroyEvent();
156   read_event_.DestroyEvent();
157   write_event_.DestroyEvent();
158 }
159
160 void CFStreamHandle::NotifyOnOpen(grpc_closure* closure) {
161   open_event_.NotifyOn(closure);
162 }
163
164 void CFStreamHandle::NotifyOnRead(grpc_closure* closure) {
165   read_event_.NotifyOn(closure);
166 }
167
168 void CFStreamHandle::NotifyOnWrite(grpc_closure* closure) {
169   write_event_.NotifyOn(closure);
170 }
171
172 void CFStreamHandle::Shutdown(grpc_error* error) {
173   open_event_.SetShutdown(GRPC_ERROR_REF(error));
174   read_event_.SetShutdown(GRPC_ERROR_REF(error));
175   write_event_.SetShutdown(GRPC_ERROR_REF(error));
176   GRPC_ERROR_UNREF(error);
177 }
178
179 void CFStreamHandle::Ref(const char* file, int line, const char* reason) {
180   if (grpc_tcp_trace.enabled()) {
181     gpr_atm val = gpr_atm_no_barrier_load(&refcount_.count);
182     gpr_log(file, line, GPR_LOG_SEVERITY_DEBUG,
183             "CFStream Handle ref %p : %s %" PRIdPTR " -> %" PRIdPTR, this,
184             reason, val, val + 1);
185   }
186   gpr_ref(&refcount_);
187 }
188
189 void CFStreamHandle::Unref(const char* file, int line, const char* reason) {
190   if (grpc_tcp_trace.enabled()) {
191     gpr_atm val = gpr_atm_no_barrier_load(&refcount_.count);
192     gpr_log(GPR_DEBUG,
193             "CFStream Handle unref %p : %s %" PRIdPTR " -> %" PRIdPTR, this,
194             reason, val, val - 1);
195   }
196   if (gpr_unref(&refcount_)) {
197     grpc_core::Delete<CFStreamHandle>(this);
198   }
199 }
200
201 #else
202
203 /* Creating a dummy function so that the grpc_cfstream library will be
204  * non-empty.
205  */
206 void CFStreamDummy() {}
207
208 #endif