3 * Copyright 2018 gRPC authors.
5 * Licensed under the Apache License, Version 2.0 (the "License");
6 * you may not use this file except in compliance with the License.
7 * You may obtain a copy of the License at
9 * http://www.apache.org/licenses/LICENSE-2.0
11 * Unless required by applicable law or agreed to in writing, software
12 * distributed under the License is distributed on an "AS IS" BASIS,
13 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14 * See the License for the specific language governing permissions and
15 * limitations under the License.
19 #include <grpc/support/port_platform.h>
21 #include "src/core/lib/gprpp/memory.h"
22 #include "src/core/lib/iomgr/port.h"
25 #import <CoreFoundation/CoreFoundation.h>
26 #import "src/core/lib/iomgr/cfstream_handle.h"
28 #include <grpc/grpc.h>
29 #include <grpc/support/atm.h>
30 #include <grpc/support/sync.h>
32 #include "src/core/lib/debug/trace.h"
33 #include "src/core/lib/iomgr/closure.h"
34 #include "src/core/lib/iomgr/error_cfstream.h"
35 #include "src/core/lib/iomgr/exec_ctx.h"
37 extern grpc_core::TraceFlag grpc_tcp_trace;
39 GrpcLibraryInitHolder::GrpcLibraryInitHolder() { grpc_init(); }
41 GrpcLibraryInitHolder::~GrpcLibraryInitHolder() { grpc_shutdown(); }
43 void* CFStreamHandle::Retain(void* info) {
44 CFStreamHandle* handle = static_cast<CFStreamHandle*>(info);
45 CFSTREAM_HANDLE_REF(handle, "retain");
49 void CFStreamHandle::Release(void* info) {
50 CFStreamHandle* handle = static_cast<CFStreamHandle*>(info);
51 CFSTREAM_HANDLE_UNREF(handle, "release");
54 CFStreamHandle* CFStreamHandle::CreateStreamHandle(
55 CFReadStreamRef read_stream, CFWriteStreamRef write_stream) {
56 return grpc_core::New<CFStreamHandle>(read_stream, write_stream);
59 void CFStreamHandle::ReadCallback(CFReadStreamRef stream,
60 CFStreamEventType type,
61 void* client_callback_info) {
62 grpc_core::ApplicationCallbackExecCtx callback_exec_ctx;
63 grpc_core::ExecCtx exec_ctx;
65 CFErrorRef stream_error;
66 CFStreamHandle* handle = static_cast<CFStreamHandle*>(client_callback_info);
67 if (grpc_tcp_trace.enabled()) {
68 gpr_log(GPR_DEBUG, "CFStream ReadCallback (%p, %p, %lu, %p)", handle,
69 stream, type, client_callback_info);
72 case kCFStreamEventOpenCompleted:
73 handle->open_event_.SetReady();
75 case kCFStreamEventHasBytesAvailable:
76 case kCFStreamEventEndEncountered:
77 handle->read_event_.SetReady();
79 case kCFStreamEventErrorOccurred:
80 stream_error = CFReadStreamCopyError(stream);
81 error = grpc_error_set_int(
82 GRPC_ERROR_CREATE_FROM_CFERROR(stream_error, "read error"),
83 GRPC_ERROR_INT_GRPC_STATUS, GRPC_STATUS_UNAVAILABLE);
84 CFRelease(stream_error);
85 handle->open_event_.SetShutdown(GRPC_ERROR_REF(error));
86 handle->write_event_.SetShutdown(GRPC_ERROR_REF(error));
87 handle->read_event_.SetShutdown(GRPC_ERROR_REF(error));
88 GRPC_ERROR_UNREF(error);
91 GPR_UNREACHABLE_CODE(return );
94 void CFStreamHandle::WriteCallback(CFWriteStreamRef stream,
95 CFStreamEventType type,
96 void* clientCallBackInfo) {
97 grpc_core::ApplicationCallbackExecCtx callback_exec_ctx;
98 grpc_core::ExecCtx exec_ctx;
100 CFErrorRef stream_error;
101 CFStreamHandle* handle = static_cast<CFStreamHandle*>(clientCallBackInfo);
102 if (grpc_tcp_trace.enabled()) {
103 gpr_log(GPR_DEBUG, "CFStream WriteCallback (%p, %p, %lu, %p)", handle,
104 stream, type, clientCallBackInfo);
107 case kCFStreamEventOpenCompleted:
108 handle->open_event_.SetReady();
110 case kCFStreamEventCanAcceptBytes:
111 case kCFStreamEventEndEncountered:
112 handle->write_event_.SetReady();
114 case kCFStreamEventErrorOccurred:
115 stream_error = CFWriteStreamCopyError(stream);
116 error = grpc_error_set_int(
117 GRPC_ERROR_CREATE_FROM_CFERROR(stream_error, "write error"),
118 GRPC_ERROR_INT_GRPC_STATUS, GRPC_STATUS_UNAVAILABLE);
119 CFRelease(stream_error);
120 handle->open_event_.SetShutdown(GRPC_ERROR_REF(error));
121 handle->write_event_.SetShutdown(GRPC_ERROR_REF(error));
122 handle->read_event_.SetShutdown(GRPC_ERROR_REF(error));
123 GRPC_ERROR_UNREF(error);
126 GPR_UNREACHABLE_CODE(return );
130 CFStreamHandle::CFStreamHandle(CFReadStreamRef read_stream,
131 CFWriteStreamRef write_stream) {
132 gpr_ref_init(&refcount_, 1);
133 open_event_.InitEvent();
134 read_event_.InitEvent();
135 write_event_.InitEvent();
136 dispatch_queue_ = dispatch_queue_create(nullptr, DISPATCH_QUEUE_SERIAL);
137 CFStreamClientContext ctx = {0, static_cast<void*>(this),
138 CFStreamHandle::Retain, CFStreamHandle::Release,
140 CFReadStreamSetClient(
142 kCFStreamEventOpenCompleted | kCFStreamEventHasBytesAvailable |
143 kCFStreamEventErrorOccurred | kCFStreamEventEndEncountered,
144 CFStreamHandle::ReadCallback, &ctx);
145 CFWriteStreamSetClient(
147 kCFStreamEventOpenCompleted | kCFStreamEventCanAcceptBytes |
148 kCFStreamEventErrorOccurred | kCFStreamEventEndEncountered,
149 CFStreamHandle::WriteCallback, &ctx);
150 CFReadStreamSetDispatchQueue(read_stream, dispatch_queue_);
151 CFWriteStreamSetDispatchQueue(write_stream, dispatch_queue_);
154 CFStreamHandle::~CFStreamHandle() {
155 open_event_.DestroyEvent();
156 read_event_.DestroyEvent();
157 write_event_.DestroyEvent();
160 void CFStreamHandle::NotifyOnOpen(grpc_closure* closure) {
161 open_event_.NotifyOn(closure);
164 void CFStreamHandle::NotifyOnRead(grpc_closure* closure) {
165 read_event_.NotifyOn(closure);
168 void CFStreamHandle::NotifyOnWrite(grpc_closure* closure) {
169 write_event_.NotifyOn(closure);
172 void CFStreamHandle::Shutdown(grpc_error* error) {
173 open_event_.SetShutdown(GRPC_ERROR_REF(error));
174 read_event_.SetShutdown(GRPC_ERROR_REF(error));
175 write_event_.SetShutdown(GRPC_ERROR_REF(error));
176 GRPC_ERROR_UNREF(error);
179 void CFStreamHandle::Ref(const char* file, int line, const char* reason) {
180 if (grpc_tcp_trace.enabled()) {
181 gpr_atm val = gpr_atm_no_barrier_load(&refcount_.count);
182 gpr_log(file, line, GPR_LOG_SEVERITY_DEBUG,
183 "CFStream Handle ref %p : %s %" PRIdPTR " -> %" PRIdPTR, this,
184 reason, val, val + 1);
189 void CFStreamHandle::Unref(const char* file, int line, const char* reason) {
190 if (grpc_tcp_trace.enabled()) {
191 gpr_atm val = gpr_atm_no_barrier_load(&refcount_.count);
193 "CFStream Handle unref %p : %s %" PRIdPTR " -> %" PRIdPTR, this,
194 reason, val, val - 1);
196 if (gpr_unref(&refcount_)) {
197 grpc_core::Delete<CFStreamHandle>(this);
203 /* Creating a dummy function so that the grpc_cfstream library will be
206 void CFStreamDummy() {}