| 30 |
ashish |
1 |
/*
|
|
|
2 |
* Licensed to the Apache Software Foundation (ASF) under one
|
|
|
3 |
* or more contributor license agreements. See the NOTICE file
|
|
|
4 |
* distributed with this work for additional information
|
|
|
5 |
* regarding copyright ownership. The ASF licenses this file
|
|
|
6 |
* to you under the Apache License, Version 2.0 (the
|
|
|
7 |
* "License"); you may not use this file except in compliance
|
|
|
8 |
* with the License. You may obtain a copy of the License at
|
|
|
9 |
*
|
|
|
10 |
* http://www.apache.org/licenses/LICENSE-2.0
|
|
|
11 |
*
|
|
|
12 |
* Unless required by applicable law or agreed to in writing,
|
|
|
13 |
* software distributed under the License is distributed on an
|
|
|
14 |
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
|
|
|
15 |
* KIND, either express or implied. See the License for the
|
|
|
16 |
* specific language governing permissions and limitations
|
|
|
17 |
* under the License.
|
|
|
18 |
*/
|
|
|
19 |
|
|
|
20 |
#ifndef _THRIFT_TRANSPORT_TBUFFERTRANSPORTS_H_
|
|
|
21 |
#define _THRIFT_TRANSPORT_TBUFFERTRANSPORTS_H_ 1
|
|
|
22 |
|
|
|
23 |
#include <cstring>
|
|
|
24 |
#include "boost/scoped_array.hpp"
|
|
|
25 |
|
|
|
26 |
#include <transport/TTransport.h>
|
|
|
27 |
|
|
|
28 |
#ifdef __GNUC__
|
|
|
29 |
#define TDB_LIKELY(val) (__builtin_expect((val), 1))
|
|
|
30 |
#define TDB_UNLIKELY(val) (__builtin_expect((val), 0))
|
|
|
31 |
#else
|
|
|
32 |
#define TDB_LIKELY(val) (val)
|
|
|
33 |
#define TDB_UNLIKELY(val) (val)
|
|
|
34 |
#endif
|
|
|
35 |
|
|
|
36 |
namespace apache { namespace thrift { namespace transport {
|
|
|
37 |
|
|
|
38 |
|
|
|
39 |
/**
|
|
|
40 |
* Base class for all transports that use read/write buffers for performance.
|
|
|
41 |
*
|
|
|
42 |
* TBufferBase is designed to implement the fast-path "memcpy" style
|
|
|
43 |
* operations that work in the common case. It does so with small and
|
|
|
44 |
* (eventually) nonvirtual, inlinable methods. TBufferBase is an abstract
|
|
|
45 |
* class. Subclasses are expected to define the "slow path" operations
|
|
|
46 |
* that have to be done when the buffers are full or empty.
|
|
|
47 |
*
|
|
|
48 |
*/
|
|
|
49 |
class TBufferBase : public TTransport {
|
|
|
50 |
|
|
|
51 |
public:
|
|
|
52 |
|
|
|
53 |
/**
|
|
|
54 |
* Fast-path read.
|
|
|
55 |
*
|
|
|
56 |
* When we have enough data buffered to fulfill the read, we can satisfy it
|
|
|
57 |
* with a single memcpy, then adjust our internal pointers. If the buffer
|
|
|
58 |
* is empty, we call out to our slow path, implemented by a subclass.
|
|
|
59 |
* This method is meant to eventually be nonvirtual and inlinable.
|
|
|
60 |
*/
|
|
|
61 |
uint32_t read(uint8_t* buf, uint32_t len) {
|
|
|
62 |
uint8_t* new_rBase = rBase_ + len;
|
|
|
63 |
if (TDB_LIKELY(new_rBase <= rBound_)) {
|
|
|
64 |
std::memcpy(buf, rBase_, len);
|
|
|
65 |
rBase_ = new_rBase;
|
|
|
66 |
return len;
|
|
|
67 |
}
|
|
|
68 |
return readSlow(buf, len);
|
|
|
69 |
}
|
|
|
70 |
|
|
|
71 |
/**
|
|
|
72 |
* Fast-path write.
|
|
|
73 |
*
|
|
|
74 |
* When we have enough empty space in our buffer to accomodate the write, we
|
|
|
75 |
* can satisfy it with a single memcpy, then adjust our internal pointers.
|
|
|
76 |
* If the buffer is full, we call out to our slow path, implemented by a
|
|
|
77 |
* subclass. This method is meant to eventually be nonvirtual and
|
|
|
78 |
* inlinable.
|
|
|
79 |
*/
|
|
|
80 |
void write(const uint8_t* buf, uint32_t len) {
|
|
|
81 |
uint8_t* new_wBase = wBase_ + len;
|
|
|
82 |
if (TDB_LIKELY(new_wBase <= wBound_)) {
|
|
|
83 |
std::memcpy(wBase_, buf, len);
|
|
|
84 |
wBase_ = new_wBase;
|
|
|
85 |
return;
|
|
|
86 |
}
|
|
|
87 |
writeSlow(buf, len);
|
|
|
88 |
}
|
|
|
89 |
|
|
|
90 |
/**
|
|
|
91 |
* Fast-path borrow. A lot like the fast-path read.
|
|
|
92 |
*/
|
|
|
93 |
const uint8_t* borrow(uint8_t* buf, uint32_t* len) {
|
|
|
94 |
if (TDB_LIKELY(static_cast<ptrdiff_t>(*len) <= rBound_ - rBase_)) {
|
|
|
95 |
// With strict aliasing, writing to len shouldn't force us to
|
|
|
96 |
// refetch rBase_ from memory. TODO(dreiss): Verify this.
|
|
|
97 |
*len = rBound_ - rBase_;
|
|
|
98 |
return rBase_;
|
|
|
99 |
}
|
|
|
100 |
return borrowSlow(buf, len);
|
|
|
101 |
}
|
|
|
102 |
|
|
|
103 |
/**
|
|
|
104 |
* Consume doesn't require a slow path.
|
|
|
105 |
*/
|
|
|
106 |
void consume(uint32_t len) {
|
|
|
107 |
if (TDB_LIKELY(static_cast<ptrdiff_t>(len) <= rBound_ - rBase_)) {
|
|
|
108 |
rBase_ += len;
|
|
|
109 |
} else {
|
|
|
110 |
throw TTransportException(TTransportException::BAD_ARGS,
|
|
|
111 |
"consume did not follow a borrow.");
|
|
|
112 |
}
|
|
|
113 |
}
|
|
|
114 |
|
|
|
115 |
|
|
|
116 |
protected:
|
|
|
117 |
|
|
|
118 |
/// Slow path read.
|
|
|
119 |
virtual uint32_t readSlow(uint8_t* buf, uint32_t len) = 0;
|
|
|
120 |
|
|
|
121 |
/// Slow path write.
|
|
|
122 |
virtual void writeSlow(const uint8_t* buf, uint32_t len) = 0;
|
|
|
123 |
|
|
|
124 |
/**
|
|
|
125 |
* Slow path borrow.
|
|
|
126 |
*
|
|
|
127 |
* POSTCONDITION: return == NULL || rBound_ - rBase_ >= *len
|
|
|
128 |
*/
|
|
|
129 |
virtual const uint8_t* borrowSlow(uint8_t* buf, uint32_t* len) = 0;
|
|
|
130 |
|
|
|
131 |
/**
|
|
|
132 |
* Trivial constructor.
|
|
|
133 |
*
|
|
|
134 |
* Initialize pointers safely. Constructing is not a very
|
|
|
135 |
* performance-sensitive operation, so it is okay to just leave it to
|
|
|
136 |
* the concrete class to set up pointers correctly.
|
|
|
137 |
*/
|
|
|
138 |
TBufferBase()
|
|
|
139 |
: rBase_(NULL)
|
|
|
140 |
, rBound_(NULL)
|
|
|
141 |
, wBase_(NULL)
|
|
|
142 |
, wBound_(NULL)
|
|
|
143 |
{}
|
|
|
144 |
|
|
|
145 |
/// Convenience mutator for setting the read buffer.
|
|
|
146 |
void setReadBuffer(uint8_t* buf, uint32_t len) {
|
|
|
147 |
rBase_ = buf;
|
|
|
148 |
rBound_ = buf+len;
|
|
|
149 |
}
|
|
|
150 |
|
|
|
151 |
/// Convenience mutator for setting the write buffer.
|
|
|
152 |
void setWriteBuffer(uint8_t* buf, uint32_t len) {
|
|
|
153 |
wBase_ = buf;
|
|
|
154 |
wBound_ = buf+len;
|
|
|
155 |
}
|
|
|
156 |
|
|
|
157 |
virtual ~TBufferBase() {}
|
|
|
158 |
|
|
|
159 |
/// Reads begin here.
|
|
|
160 |
uint8_t* rBase_;
|
|
|
161 |
/// Reads may extend to just before here.
|
|
|
162 |
uint8_t* rBound_;
|
|
|
163 |
|
|
|
164 |
/// Writes begin here.
|
|
|
165 |
uint8_t* wBase_;
|
|
|
166 |
/// Writes may extend to just before here.
|
|
|
167 |
uint8_t* wBound_;
|
|
|
168 |
};
|
|
|
169 |
|
|
|
170 |
|
|
|
171 |
/**
|
|
|
172 |
* Base class for all transport which wraps transport to new one.
|
|
|
173 |
*/
|
|
|
174 |
class TUnderlyingTransport : public TBufferBase {
|
|
|
175 |
public:
|
|
|
176 |
static const int DEFAULT_BUFFER_SIZE = 512;
|
|
|
177 |
|
|
|
178 |
virtual bool peek() {
|
|
|
179 |
return (rBase_ < rBound_) || transport_->peek();
|
|
|
180 |
}
|
|
|
181 |
|
|
|
182 |
void open() {
|
|
|
183 |
transport_->open();
|
|
|
184 |
}
|
|
|
185 |
|
|
|
186 |
bool isOpen() {
|
|
|
187 |
return transport_->isOpen();
|
|
|
188 |
}
|
|
|
189 |
|
|
|
190 |
void close() {
|
|
|
191 |
flush();
|
|
|
192 |
transport_->close();
|
|
|
193 |
}
|
|
|
194 |
|
|
|
195 |
boost::shared_ptr<TTransport> getUnderlyingTransport() {
|
|
|
196 |
return transport_;
|
|
|
197 |
}
|
|
|
198 |
|
|
|
199 |
protected:
|
|
|
200 |
boost::shared_ptr<TTransport> transport_;
|
|
|
201 |
|
|
|
202 |
uint32_t rBufSize_;
|
|
|
203 |
uint32_t wBufSize_;
|
|
|
204 |
boost::scoped_array<uint8_t> rBuf_;
|
|
|
205 |
boost::scoped_array<uint8_t> wBuf_;
|
|
|
206 |
|
|
|
207 |
TUnderlyingTransport(boost::shared_ptr<TTransport> transport, uint32_t sz)
|
|
|
208 |
: transport_(transport)
|
|
|
209 |
, rBufSize_(sz)
|
|
|
210 |
, wBufSize_(sz)
|
|
|
211 |
, rBuf_(new uint8_t[rBufSize_])
|
|
|
212 |
, wBuf_(new uint8_t[wBufSize_]) {}
|
|
|
213 |
|
|
|
214 |
TUnderlyingTransport(boost::shared_ptr<TTransport> transport)
|
|
|
215 |
: transport_(transport)
|
|
|
216 |
, rBufSize_(DEFAULT_BUFFER_SIZE)
|
|
|
217 |
, wBufSize_(DEFAULT_BUFFER_SIZE)
|
|
|
218 |
, rBuf_(new uint8_t[rBufSize_])
|
|
|
219 |
, wBuf_(new uint8_t[wBufSize_]) {}
|
|
|
220 |
|
|
|
221 |
TUnderlyingTransport(boost::shared_ptr<TTransport> transport, uint32_t rsz, uint32_t wsz)
|
|
|
222 |
: transport_(transport)
|
|
|
223 |
, rBufSize_(rsz)
|
|
|
224 |
, wBufSize_(wsz)
|
|
|
225 |
, rBuf_(new uint8_t[rBufSize_])
|
|
|
226 |
, wBuf_(new uint8_t[wBufSize_]) {}
|
|
|
227 |
};
|
|
|
228 |
|
|
|
229 |
/**
|
|
|
230 |
* Buffered transport. For reads it will read more data than is requested
|
|
|
231 |
* and will serve future data out of a local buffer. For writes, data is
|
|
|
232 |
* stored to an in memory buffer before being written out.
|
|
|
233 |
*
|
|
|
234 |
*/
|
|
|
235 |
class TBufferedTransport : public TUnderlyingTransport {
|
|
|
236 |
public:
|
|
|
237 |
|
|
|
238 |
/// Use default buffer sizes.
|
|
|
239 |
TBufferedTransport(boost::shared_ptr<TTransport> transport)
|
|
|
240 |
: TUnderlyingTransport(transport)
|
|
|
241 |
{
|
|
|
242 |
initPointers();
|
|
|
243 |
}
|
|
|
244 |
|
|
|
245 |
/// Use specified buffer sizes.
|
|
|
246 |
TBufferedTransport(boost::shared_ptr<TTransport> transport, uint32_t sz)
|
|
|
247 |
: TUnderlyingTransport(transport, sz)
|
|
|
248 |
{
|
|
|
249 |
initPointers();
|
|
|
250 |
}
|
|
|
251 |
|
|
|
252 |
/// Use specified read and write buffer sizes.
|
|
|
253 |
TBufferedTransport(boost::shared_ptr<TTransport> transport, uint32_t rsz, uint32_t wsz)
|
|
|
254 |
: TUnderlyingTransport(transport, rsz, wsz)
|
|
|
255 |
{
|
|
|
256 |
initPointers();
|
|
|
257 |
}
|
|
|
258 |
|
|
|
259 |
virtual bool peek() {
|
|
|
260 |
/* shigin: see THRIFT-96 discussion */
|
|
|
261 |
if (rBase_ == rBound_) {
|
|
|
262 |
setReadBuffer(rBuf_.get(), transport_->read(rBuf_.get(), rBufSize_));
|
|
|
263 |
}
|
|
|
264 |
return (rBound_ > rBase_);
|
|
|
265 |
}
|
|
|
266 |
virtual uint32_t readSlow(uint8_t* buf, uint32_t len);
|
|
|
267 |
|
|
|
268 |
virtual void writeSlow(const uint8_t* buf, uint32_t len);
|
|
|
269 |
|
|
|
270 |
void flush();
|
|
|
271 |
|
|
|
272 |
|
|
|
273 |
/**
|
|
|
274 |
* The following behavior is currently implemented by TBufferedTransport,
|
|
|
275 |
* but that may change in a future version:
|
|
|
276 |
* 1/ If len is at most rBufSize_, borrow will never return NULL.
|
|
|
277 |
* Depending on the underlying transport, it could throw an exception
|
|
|
278 |
* or hang forever.
|
|
|
279 |
* 2/ Some borrow requests may copy bytes internally. However,
|
|
|
280 |
* if len is at most rBufSize_/2, none of the copied bytes
|
|
|
281 |
* will ever have to be copied again. For optimial performance,
|
|
|
282 |
* stay under this limit.
|
|
|
283 |
*/
|
|
|
284 |
virtual const uint8_t* borrowSlow(uint8_t* buf, uint32_t* len);
|
|
|
285 |
|
|
|
286 |
protected:
|
|
|
287 |
void initPointers() {
|
|
|
288 |
setReadBuffer(rBuf_.get(), 0);
|
|
|
289 |
setWriteBuffer(wBuf_.get(), wBufSize_);
|
|
|
290 |
// Write size never changes.
|
|
|
291 |
}
|
|
|
292 |
};
|
|
|
293 |
|
|
|
294 |
|
|
|
295 |
/**
|
|
|
296 |
* Wraps a transport into a buffered one.
|
|
|
297 |
*
|
|
|
298 |
*/
|
|
|
299 |
class TBufferedTransportFactory : public TTransportFactory {
|
|
|
300 |
public:
|
|
|
301 |
TBufferedTransportFactory() {}
|
|
|
302 |
|
|
|
303 |
virtual ~TBufferedTransportFactory() {}
|
|
|
304 |
|
|
|
305 |
/**
|
|
|
306 |
* Wraps the transport into a buffered one.
|
|
|
307 |
*/
|
|
|
308 |
virtual boost::shared_ptr<TTransport> getTransport(boost::shared_ptr<TTransport> trans) {
|
|
|
309 |
return boost::shared_ptr<TTransport>(new TBufferedTransport(trans));
|
|
|
310 |
}
|
|
|
311 |
|
|
|
312 |
};
|
|
|
313 |
|
|
|
314 |
|
|
|
315 |
/**
|
|
|
316 |
* Framed transport. All writes go into an in-memory buffer until flush is
|
|
|
317 |
* called, at which point the transport writes the length of the entire
|
|
|
318 |
* binary chunk followed by the data payload. This allows the receiver on the
|
|
|
319 |
* other end to always do fixed-length reads.
|
|
|
320 |
*
|
|
|
321 |
*/
|
|
|
322 |
class TFramedTransport : public TUnderlyingTransport {
|
|
|
323 |
public:
|
|
|
324 |
|
|
|
325 |
/// Use default buffer sizes.
|
|
|
326 |
TFramedTransport(boost::shared_ptr<TTransport> transport)
|
|
|
327 |
: TUnderlyingTransport(transport)
|
|
|
328 |
{
|
|
|
329 |
initPointers();
|
|
|
330 |
}
|
|
|
331 |
|
|
|
332 |
TFramedTransport(boost::shared_ptr<TTransport> transport, uint32_t sz)
|
|
|
333 |
: TUnderlyingTransport(transport, sz)
|
|
|
334 |
{
|
|
|
335 |
initPointers();
|
|
|
336 |
}
|
|
|
337 |
|
|
|
338 |
virtual uint32_t readSlow(uint8_t* buf, uint32_t len);
|
|
|
339 |
|
|
|
340 |
virtual void writeSlow(const uint8_t* buf, uint32_t len);
|
|
|
341 |
|
|
|
342 |
virtual void flush();
|
|
|
343 |
|
|
|
344 |
const uint8_t* borrowSlow(uint8_t* buf, uint32_t* len);
|
|
|
345 |
|
|
|
346 |
protected:
|
|
|
347 |
/**
|
|
|
348 |
* Reads a frame of input from the underlying stream.
|
|
|
349 |
*/
|
|
|
350 |
void readFrame();
|
|
|
351 |
|
|
|
352 |
void initPointers() {
|
|
|
353 |
setReadBuffer(NULL, 0);
|
|
|
354 |
setWriteBuffer(wBuf_.get(), wBufSize_);
|
|
|
355 |
|
|
|
356 |
// Pad the buffer so we can insert the size later.
|
|
|
357 |
int32_t pad = 0;
|
|
|
358 |
this->write((uint8_t*)&pad, sizeof(pad));
|
|
|
359 |
}
|
|
|
360 |
};
|
|
|
361 |
|
|
|
362 |
/**
|
|
|
363 |
* Wraps a transport into a framed one.
|
|
|
364 |
*
|
|
|
365 |
*/
|
|
|
366 |
class TFramedTransportFactory : public TTransportFactory {
|
|
|
367 |
public:
|
|
|
368 |
TFramedTransportFactory() {}
|
|
|
369 |
|
|
|
370 |
virtual ~TFramedTransportFactory() {}
|
|
|
371 |
|
|
|
372 |
/**
|
|
|
373 |
* Wraps the transport into a framed one.
|
|
|
374 |
*/
|
|
|
375 |
virtual boost::shared_ptr<TTransport> getTransport(boost::shared_ptr<TTransport> trans) {
|
|
|
376 |
return boost::shared_ptr<TTransport>(new TFramedTransport(trans));
|
|
|
377 |
}
|
|
|
378 |
|
|
|
379 |
};
|
|
|
380 |
|
|
|
381 |
|
|
|
382 |
/**
|
|
|
383 |
* A memory buffer is a tranpsort that simply reads from and writes to an
|
|
|
384 |
* in memory buffer. Anytime you call write on it, the data is simply placed
|
|
|
385 |
* into a buffer, and anytime you call read, data is read from that buffer.
|
|
|
386 |
*
|
|
|
387 |
* The buffers are allocated using C constructs malloc,realloc, and the size
|
|
|
388 |
* doubles as necessary. We've considered using scoped
|
|
|
389 |
*
|
|
|
390 |
*/
|
|
|
391 |
class TMemoryBuffer : public TBufferBase {
|
|
|
392 |
private:
|
|
|
393 |
|
|
|
394 |
// Common initialization done by all constructors.
|
|
|
395 |
void initCommon(uint8_t* buf, uint32_t size, bool owner, uint32_t wPos) {
|
|
|
396 |
if (buf == NULL && size != 0) {
|
|
|
397 |
assert(owner);
|
|
|
398 |
buf = (uint8_t*)std::malloc(size);
|
|
|
399 |
if (buf == NULL) {
|
|
|
400 |
throw TTransportException("Out of memory");
|
|
|
401 |
}
|
|
|
402 |
}
|
|
|
403 |
|
|
|
404 |
buffer_ = buf;
|
|
|
405 |
bufferSize_ = size;
|
|
|
406 |
|
|
|
407 |
rBase_ = buffer_;
|
|
|
408 |
rBound_ = buffer_ + wPos;
|
|
|
409 |
// TODO(dreiss): Investigate NULL-ing this if !owner.
|
|
|
410 |
wBase_ = buffer_ + wPos;
|
|
|
411 |
wBound_ = buffer_ + bufferSize_;
|
|
|
412 |
|
|
|
413 |
owner_ = owner;
|
|
|
414 |
|
|
|
415 |
// rBound_ is really an artifact. In principle, it should always be
|
|
|
416 |
// equal to wBase_. We update it in a few places (computeRead, etc.).
|
|
|
417 |
}
|
|
|
418 |
|
|
|
419 |
public:
|
|
|
420 |
static const uint32_t defaultSize = 1024;
|
|
|
421 |
|
|
|
422 |
/**
|
|
|
423 |
* This enum specifies how a TMemoryBuffer should treat
|
|
|
424 |
* memory passed to it via constructors or resetBuffer.
|
|
|
425 |
*
|
|
|
426 |
* OBSERVE:
|
|
|
427 |
* TMemoryBuffer will simply store a pointer to the memory.
|
|
|
428 |
* It is the callers responsibility to ensure that the pointer
|
|
|
429 |
* remains valid for the lifetime of the TMemoryBuffer,
|
|
|
430 |
* and that it is properly cleaned up.
|
|
|
431 |
* Note that no data can be written to observed buffers.
|
|
|
432 |
*
|
|
|
433 |
* COPY:
|
|
|
434 |
* TMemoryBuffer will make an internal copy of the buffer.
|
|
|
435 |
* The caller has no responsibilities.
|
|
|
436 |
*
|
|
|
437 |
* TAKE_OWNERSHIP:
|
|
|
438 |
* TMemoryBuffer will become the "owner" of the buffer,
|
|
|
439 |
* and will be responsible for freeing it.
|
|
|
440 |
* The membory must have been allocated with malloc.
|
|
|
441 |
*/
|
|
|
442 |
enum MemoryPolicy
|
|
|
443 |
{ OBSERVE = 1
|
|
|
444 |
, COPY = 2
|
|
|
445 |
, TAKE_OWNERSHIP = 3
|
|
|
446 |
};
|
|
|
447 |
|
|
|
448 |
/**
|
|
|
449 |
* Construct a TMemoryBuffer with a default-sized buffer,
|
|
|
450 |
* owned by the TMemoryBuffer object.
|
|
|
451 |
*/
|
|
|
452 |
TMemoryBuffer() {
|
|
|
453 |
initCommon(NULL, defaultSize, true, 0);
|
|
|
454 |
}
|
|
|
455 |
|
|
|
456 |
/**
|
|
|
457 |
* Construct a TMemoryBuffer with a buffer of a specified size,
|
|
|
458 |
* owned by the TMemoryBuffer object.
|
|
|
459 |
*
|
|
|
460 |
* @param sz The initial size of the buffer.
|
|
|
461 |
*/
|
|
|
462 |
TMemoryBuffer(uint32_t sz) {
|
|
|
463 |
initCommon(NULL, sz, true, 0);
|
|
|
464 |
}
|
|
|
465 |
|
|
|
466 |
/**
|
|
|
467 |
* Construct a TMemoryBuffer with buf as its initial contents.
|
|
|
468 |
*
|
|
|
469 |
* @param buf The initial contents of the buffer.
|
|
|
470 |
* Note that, while buf is a non-const pointer,
|
|
|
471 |
* TMemoryBuffer will not write to it if policy == OBSERVE,
|
|
|
472 |
* so it is safe to const_cast<uint8_t*>(whatever).
|
|
|
473 |
* @param sz The size of @c buf.
|
|
|
474 |
* @param policy See @link MemoryPolicy @endlink .
|
|
|
475 |
*/
|
|
|
476 |
TMemoryBuffer(uint8_t* buf, uint32_t sz, MemoryPolicy policy = OBSERVE) {
|
|
|
477 |
if (buf == NULL && sz != 0) {
|
|
|
478 |
throw TTransportException(TTransportException::BAD_ARGS,
|
|
|
479 |
"TMemoryBuffer given null buffer with non-zero size.");
|
|
|
480 |
}
|
|
|
481 |
|
|
|
482 |
switch (policy) {
|
|
|
483 |
case OBSERVE:
|
|
|
484 |
case TAKE_OWNERSHIP:
|
|
|
485 |
initCommon(buf, sz, policy == TAKE_OWNERSHIP, sz);
|
|
|
486 |
break;
|
|
|
487 |
case COPY:
|
|
|
488 |
initCommon(NULL, sz, true, 0);
|
|
|
489 |
this->write(buf, sz);
|
|
|
490 |
break;
|
|
|
491 |
default:
|
|
|
492 |
throw TTransportException(TTransportException::BAD_ARGS,
|
|
|
493 |
"Invalid MemoryPolicy for TMemoryBuffer");
|
|
|
494 |
}
|
|
|
495 |
}
|
|
|
496 |
|
|
|
497 |
~TMemoryBuffer() {
|
|
|
498 |
if (owner_) {
|
|
|
499 |
std::free(buffer_);
|
|
|
500 |
}
|
|
|
501 |
}
|
|
|
502 |
|
|
|
503 |
bool isOpen() {
|
|
|
504 |
return true;
|
|
|
505 |
}
|
|
|
506 |
|
|
|
507 |
bool peek() {
|
|
|
508 |
return (rBase_ < wBase_);
|
|
|
509 |
}
|
|
|
510 |
|
|
|
511 |
void open() {}
|
|
|
512 |
|
|
|
513 |
void close() {}
|
|
|
514 |
|
|
|
515 |
// TODO(dreiss): Make bufPtr const.
|
|
|
516 |
void getBuffer(uint8_t** bufPtr, uint32_t* sz) {
|
|
|
517 |
*bufPtr = rBase_;
|
|
|
518 |
*sz = wBase_ - rBase_;
|
|
|
519 |
}
|
|
|
520 |
|
|
|
521 |
std::string getBufferAsString() {
|
|
|
522 |
if (buffer_ == NULL) {
|
|
|
523 |
return "";
|
|
|
524 |
}
|
|
|
525 |
uint8_t* buf;
|
|
|
526 |
uint32_t sz;
|
|
|
527 |
getBuffer(&buf, &sz);
|
|
|
528 |
return std::string((char*)buf, (std::string::size_type)sz);
|
|
|
529 |
}
|
|
|
530 |
|
|
|
531 |
void appendBufferToString(std::string& str) {
|
|
|
532 |
if (buffer_ == NULL) {
|
|
|
533 |
return;
|
|
|
534 |
}
|
|
|
535 |
uint8_t* buf;
|
|
|
536 |
uint32_t sz;
|
|
|
537 |
getBuffer(&buf, &sz);
|
|
|
538 |
str.append((char*)buf, sz);
|
|
|
539 |
}
|
|
|
540 |
|
|
|
541 |
void resetBuffer(bool reset_capacity = false) {
|
|
|
542 |
if (reset_capacity)
|
|
|
543 |
{
|
|
|
544 |
assert(owner_);
|
|
|
545 |
|
|
|
546 |
void* new_buffer = std::realloc(buffer_, defaultSize);
|
|
|
547 |
|
|
|
548 |
if (new_buffer == NULL) {
|
|
|
549 |
throw TTransportException("Out of memory.");
|
|
|
550 |
}
|
|
|
551 |
|
|
|
552 |
buffer_ = (uint8_t*) new_buffer;
|
|
|
553 |
bufferSize_ = defaultSize;
|
|
|
554 |
|
|
|
555 |
wBound_ = buffer_ + bufferSize_;
|
|
|
556 |
}
|
|
|
557 |
|
|
|
558 |
rBase_ = buffer_;
|
|
|
559 |
rBound_ = buffer_;
|
|
|
560 |
wBase_ = buffer_;
|
|
|
561 |
// It isn't safe to write into a buffer we don't own.
|
|
|
562 |
if (!owner_) {
|
|
|
563 |
wBound_ = wBase_;
|
|
|
564 |
bufferSize_ = 0;
|
|
|
565 |
}
|
|
|
566 |
}
|
|
|
567 |
|
|
|
568 |
/// See constructor documentation.
|
|
|
569 |
void resetBuffer(uint8_t* buf, uint32_t sz, MemoryPolicy policy = OBSERVE) {
|
|
|
570 |
// Use a variant of the copy-and-swap trick for assignment operators.
|
|
|
571 |
// This is sub-optimal in terms of performance for two reasons:
|
|
|
572 |
// 1/ The constructing and swapping of the (small) values
|
|
|
573 |
// in the temporary object takes some time, and is not necessary.
|
|
|
574 |
// 2/ If policy == COPY, we allocate the new buffer before
|
|
|
575 |
// freeing the old one, precluding the possibility of
|
|
|
576 |
// reusing that memory.
|
|
|
577 |
// I doubt that either of these problems could be optimized away,
|
|
|
578 |
// but the second is probably no a common case, and the first is minor.
|
|
|
579 |
// I don't expect resetBuffer to be a common operation, so I'm willing to
|
|
|
580 |
// bite the performance bullet to make the method this simple.
|
|
|
581 |
|
|
|
582 |
// Construct the new buffer.
|
|
|
583 |
TMemoryBuffer new_buffer(buf, sz, policy);
|
|
|
584 |
// Move it into ourself.
|
|
|
585 |
this->swap(new_buffer);
|
|
|
586 |
// Our old self gets destroyed.
|
|
|
587 |
}
|
|
|
588 |
|
|
|
589 |
std::string readAsString(uint32_t len) {
|
|
|
590 |
std::string str;
|
|
|
591 |
(void)readAppendToString(str, len);
|
|
|
592 |
return str;
|
|
|
593 |
}
|
|
|
594 |
|
|
|
595 |
uint32_t readAppendToString(std::string& str, uint32_t len);
|
|
|
596 |
|
|
|
597 |
void readEnd() {
|
|
|
598 |
if (rBase_ == wBase_) {
|
|
|
599 |
resetBuffer();
|
|
|
600 |
}
|
|
|
601 |
}
|
|
|
602 |
|
|
|
603 |
uint32_t available_read() const {
|
|
|
604 |
// Remember, wBase_ is the real rBound_.
|
|
|
605 |
return wBase_ - rBase_;
|
|
|
606 |
}
|
|
|
607 |
|
|
|
608 |
uint32_t available_write() const {
|
|
|
609 |
return wBound_ - wBase_;
|
|
|
610 |
}
|
|
|
611 |
|
|
|
612 |
// Returns a pointer to where the client can write data to append to
|
|
|
613 |
// the TMemoryBuffer, and ensures the buffer is big enough to accomodate a
|
|
|
614 |
// write of the provided length. The returned pointer is very convenient for
|
|
|
615 |
// passing to read(), recv(), or similar. You must call wroteBytes() as soon
|
|
|
616 |
// as data is written or the buffer will not be aware that data has changed.
|
|
|
617 |
uint8_t* getWritePtr(uint32_t len) {
|
|
|
618 |
ensureCanWrite(len);
|
|
|
619 |
return wBase_;
|
|
|
620 |
}
|
|
|
621 |
|
|
|
622 |
// Informs the buffer that the client has written 'len' bytes into storage
|
|
|
623 |
// that had been provided by getWritePtr().
|
|
|
624 |
void wroteBytes(uint32_t len);
|
|
|
625 |
|
|
|
626 |
protected:
|
|
|
627 |
void swap(TMemoryBuffer& that) {
|
|
|
628 |
using std::swap;
|
|
|
629 |
swap(buffer_, that.buffer_);
|
|
|
630 |
swap(bufferSize_, that.bufferSize_);
|
|
|
631 |
|
|
|
632 |
swap(rBase_, that.rBase_);
|
|
|
633 |
swap(rBound_, that.rBound_);
|
|
|
634 |
swap(wBase_, that.wBase_);
|
|
|
635 |
swap(wBound_, that.wBound_);
|
|
|
636 |
|
|
|
637 |
swap(owner_, that.owner_);
|
|
|
638 |
}
|
|
|
639 |
|
|
|
640 |
// Make sure there's at least 'len' bytes available for writing.
|
|
|
641 |
void ensureCanWrite(uint32_t len);
|
|
|
642 |
|
|
|
643 |
// Compute the position and available data for reading.
|
|
|
644 |
void computeRead(uint32_t len, uint8_t** out_start, uint32_t* out_give);
|
|
|
645 |
|
|
|
646 |
uint32_t readSlow(uint8_t* buf, uint32_t len);
|
|
|
647 |
|
|
|
648 |
void writeSlow(const uint8_t* buf, uint32_t len);
|
|
|
649 |
|
|
|
650 |
const uint8_t* borrowSlow(uint8_t* buf, uint32_t* len);
|
|
|
651 |
|
|
|
652 |
// Data buffer
|
|
|
653 |
uint8_t* buffer_;
|
|
|
654 |
|
|
|
655 |
// Allocated buffer size
|
|
|
656 |
uint32_t bufferSize_;
|
|
|
657 |
|
|
|
658 |
// Is this object the owner of the buffer?
|
|
|
659 |
bool owner_;
|
|
|
660 |
|
|
|
661 |
// Don't forget to update constrctors, initCommon, and swap if
|
|
|
662 |
// you add new members.
|
|
|
663 |
};
|
|
|
664 |
|
|
|
665 |
}}} // apache::thrift::transport
|
|
|
666 |
|
|
|
667 |
#endif // #ifndef _THRIFT_TRANSPORT_TBUFFERTRANSPORTS_H_
|