3 * Copyright 2015 gRPC authors.
5 * Licensed under the Apache License, Version 2.0 (the "License");
6 * you may not use this file except in compliance with the License.
7 * You may obtain a copy of the License at
9 * http://www.apache.org/licenses/LICENSE-2.0
11 * Unless required by applicable law or agreed to in writing, software
12 * distributed under the License is distributed on an "AS IS" BASIS,
13 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14 * See the License for the specific language governing permissions and
15 * limitations under the License.
19 #include <grpc/support/port_platform.h>
21 #include "src/core/lib/transport/byte_stream.h"
26 #include <grpc/support/log.h>
28 #include "src/core/lib/gprpp/memory.h"
29 #include "src/core/lib/slice/slice_internal.h"
34 // SliceBufferByteStream
37 SliceBufferByteStream::SliceBufferByteStream(grpc_slice_buffer* slice_buffer,
39 : ByteStream(static_cast<uint32_t>(slice_buffer->length), flags) {
40 GPR_ASSERT(slice_buffer->length <= UINT32_MAX);
41 grpc_slice_buffer_init(&backing_buffer_);
42 grpc_slice_buffer_swap(slice_buffer, &backing_buffer_);
45 SliceBufferByteStream::~SliceBufferByteStream() {}
47 void SliceBufferByteStream::Orphan() {
48 grpc_slice_buffer_destroy_internal(&backing_buffer_);
49 GRPC_ERROR_UNREF(shutdown_error_);
50 // Note: We do not actually delete the object here, since
51 // SliceBufferByteStream is usually allocated as part of a larger
52 // object and has an OrphanablePtr of itself passed down through the
56 bool SliceBufferByteStream::Next(size_t max_size_hint,
57 grpc_closure* on_complete) {
58 GPR_DEBUG_ASSERT(backing_buffer_.count > 0);
62 grpc_error* SliceBufferByteStream::Pull(grpc_slice* slice) {
63 if (GPR_UNLIKELY(shutdown_error_ != GRPC_ERROR_NONE)) {
64 return GRPC_ERROR_REF(shutdown_error_);
66 *slice = grpc_slice_buffer_take_first(&backing_buffer_);
67 return GRPC_ERROR_NONE;
70 void SliceBufferByteStream::Shutdown(grpc_error* error) {
71 GRPC_ERROR_UNREF(shutdown_error_);
72 shutdown_error_ = error;
79 ByteStreamCache::ByteStreamCache(OrphanablePtr<ByteStream> underlying_stream)
80 : underlying_stream_(std::move(underlying_stream)),
81 length_(underlying_stream_->length()),
82 flags_(underlying_stream_->flags()) {
83 grpc_slice_buffer_init(&cache_buffer_);
86 ByteStreamCache::~ByteStreamCache() { Destroy(); }
88 void ByteStreamCache::Destroy() {
89 underlying_stream_.reset();
90 if (cache_buffer_.length > 0) {
91 grpc_slice_buffer_destroy_internal(&cache_buffer_);
96 // ByteStreamCache::CachingByteStream
99 ByteStreamCache::CachingByteStream::CachingByteStream(ByteStreamCache* cache)
100 : ByteStream(cache->length_, cache->flags_), cache_(cache) {}
102 ByteStreamCache::CachingByteStream::~CachingByteStream() {}
104 void ByteStreamCache::CachingByteStream::Orphan() {
105 GRPC_ERROR_UNREF(shutdown_error_);
106 // Note: We do not actually delete the object here, since
107 // CachingByteStream is usually allocated as part of a larger
108 // object and has an OrphanablePtr of itself passed down through the
112 bool ByteStreamCache::CachingByteStream::Next(size_t max_size_hint,
113 grpc_closure* on_complete) {
114 if (shutdown_error_ != GRPC_ERROR_NONE) return true;
115 if (cursor_ < cache_->cache_buffer_.count) return true;
116 GPR_ASSERT(cache_->underlying_stream_ != nullptr);
117 return cache_->underlying_stream_->Next(max_size_hint, on_complete);
120 grpc_error* ByteStreamCache::CachingByteStream::Pull(grpc_slice* slice) {
121 if (shutdown_error_ != GRPC_ERROR_NONE) {
122 return GRPC_ERROR_REF(shutdown_error_);
124 if (cursor_ < cache_->cache_buffer_.count) {
125 *slice = grpc_slice_ref_internal(cache_->cache_buffer_.slices[cursor_]);
127 offset_ += GRPC_SLICE_LENGTH(*slice);
128 return GRPC_ERROR_NONE;
130 GPR_ASSERT(cache_->underlying_stream_ != nullptr);
131 grpc_error* error = cache_->underlying_stream_->Pull(slice);
132 if (error == GRPC_ERROR_NONE) {
133 grpc_slice_buffer_add(&cache_->cache_buffer_,
134 grpc_slice_ref_internal(*slice));
136 offset_ += GRPC_SLICE_LENGTH(*slice);
137 // Orphan the underlying stream if it's been drained.
138 if (offset_ == cache_->underlying_stream_->length()) {
139 cache_->underlying_stream_.reset();
145 void ByteStreamCache::CachingByteStream::Shutdown(grpc_error* error) {
146 GRPC_ERROR_UNREF(shutdown_error_);
147 shutdown_error_ = GRPC_ERROR_REF(error);
148 if (cache_->underlying_stream_ != nullptr) {
149 cache_->underlying_stream_->Shutdown(error);
153 void ByteStreamCache::CachingByteStream::Reset() {
158 } // namespace grpc_core