3 * Copyright 2015 gRPC authors.
5 * Licensed under the Apache License, Version 2.0 (the "License");
6 * you may not use this file except in compliance with the License.
7 * You may obtain a copy of the License at
9 * http://www.apache.org/licenses/LICENSE-2.0
11 * Unless required by applicable law or agreed to in writing, software
12 * distributed under the License is distributed on an "AS IS" BASIS,
13 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14 * See the License for the specific language governing permissions and
15 * limitations under the License.
19 #ifndef GRPC_CORE_LIB_TRANSPORT_BYTE_STREAM_H
20 #define GRPC_CORE_LIB_TRANSPORT_BYTE_STREAM_H
22 #include <grpc/support/port_platform.h>
24 #include <grpc/slice_buffer.h>
25 #include "src/core/lib/gprpp/abstract.h"
26 #include "src/core/lib/gprpp/orphanable.h"
27 #include "src/core/lib/iomgr/closure.h"
29 /** Internal bit flag for grpc_begin_message's \a flags signaling the use of
30 * compression for the message */
31 #define GRPC_WRITE_INTERNAL_COMPRESS (0x80000000u)
32 /** Mask of all valid internal flags. */
33 #define GRPC_WRITE_INTERNAL_USED_MASK (GRPC_WRITE_INTERNAL_COMPRESS)
37 class ByteStream : public Orphanable {
39 virtual ~ByteStream() {}
41 // Returns true if the bytes are available immediately (in which case
42 // on_complete will not be called), or false if the bytes will be available
43 // asynchronously (in which case on_complete will be called when they
46 // max_size_hint can be set as a hint as to the maximum number
47 // of bytes that would be acceptable to read.
48 virtual bool Next(size_t max_size_hint,
49 grpc_closure* on_complete) GRPC_ABSTRACT;
51 // Returns the next slice in the byte stream when it is available, as
52 // indicated by Next().
54 // Once a slice is returned into *slice, it is owned by the caller.
55 virtual grpc_error* Pull(grpc_slice* slice) GRPC_ABSTRACT;
57 // Shuts down the byte stream.
59 // If there is a pending call to on_complete from Next(), it will be
60 // invoked with the error passed to Shutdown().
62 // The next call to Pull() (if any) will return the error passed to
64 virtual void Shutdown(grpc_error* error) GRPC_ABSTRACT;
66 uint32_t length() const { return length_; }
67 uint32_t flags() const { return flags_; }
69 void set_flags(uint32_t flags) { flags_ = flags; }
71 GRPC_ABSTRACT_BASE_CLASS
74 ByteStream(uint32_t length, uint32_t flags)
75 : length_(length), flags_(flags) {}
78 const uint32_t length_;
83 // SliceBufferByteStream
85 // A ByteStream that wraps a slice buffer.
88 class SliceBufferByteStream : public ByteStream {
90 // Removes all slices in slice_buffer, leaving it empty.
91 SliceBufferByteStream(grpc_slice_buffer* slice_buffer, uint32_t flags);
93 ~SliceBufferByteStream();
95 void Orphan() override;
97 bool Next(size_t max_size_hint, grpc_closure* on_complete) override;
98 grpc_error* Pull(grpc_slice* slice) override;
99 void Shutdown(grpc_error* error) override;
102 grpc_error* shutdown_error_ = GRPC_ERROR_NONE;
103 grpc_slice_buffer backing_buffer_;
109 // A ByteStream that that wraps an underlying byte stream but caches
110 // the resulting slices in a slice buffer. If an initial attempt fails
111 // without fully draining the underlying stream, a new caching stream
112 // can be created from the same underlying cache, in which case it will
113 // return whatever is in the backing buffer before continuing to read the
114 // underlying stream.
116 // NOTE: No synchronization is done, so it is not safe to have multiple
117 // CachingByteStreams simultaneously drawing from the same underlying
118 // ByteStreamCache at the same time.
121 class ByteStreamCache {
123 class CachingByteStream : public ByteStream {
125 explicit CachingByteStream(ByteStreamCache* cache);
127 ~CachingByteStream();
129 void Orphan() override;
131 bool Next(size_t max_size_hint, grpc_closure* on_complete) override;
132 grpc_error* Pull(grpc_slice* slice) override;
133 void Shutdown(grpc_error* error) override;
135 // Resets the byte stream to the start of the underlying stream.
139 ByteStreamCache* cache_;
142 grpc_error* shutdown_error_ = GRPC_ERROR_NONE;
145 explicit ByteStreamCache(OrphanablePtr<ByteStream> underlying_stream);
149 // Must not be destroyed while still in use by a CachingByteStream.
152 grpc_slice_buffer* cache_buffer() { return &cache_buffer_; }
155 OrphanablePtr<ByteStream> underlying_stream_;
158 grpc_slice_buffer cache_buffer_;
161 } // namespace grpc_core
163 #endif /* GRPC_CORE_LIB_TRANSPORT_BYTE_STREAM_H */