Rev 30 | Blame | Compare with Previous | Last modification | View Log | RSS feed
/** Licensed to the Apache Software Foundation (ASF) under one* or more contributor license agreements. See the NOTICE file* distributed with this work for additional information* regarding copyright ownership. The ASF licenses this file* to you under the Apache License, Version 2.0 (the* "License"); you may not use this file except in compliance* with the License. You may obtain a copy of the License at** http://www.apache.org/licenses/LICENSE-2.0** Unless required by applicable law or agreed to in writing,* software distributed under the License is distributed on an* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY* KIND, either express or implied. See the License for the* specific language governing permissions and limitations* under the License.*/#ifndef _THRIFT_TRANSPORT_TBUFFERTRANSPORTS_H_#define _THRIFT_TRANSPORT_TBUFFERTRANSPORTS_H_ 1#include <cstring>#include "boost/scoped_array.hpp"#include <transport/TTransport.h>#ifdef __GNUC__#define TDB_LIKELY(val) (__builtin_expect((val), 1))#define TDB_UNLIKELY(val) (__builtin_expect((val), 0))#else#define TDB_LIKELY(val) (val)#define TDB_UNLIKELY(val) (val)#endifnamespace apache { namespace thrift { namespace transport {/*** Base class for all transports that use read/write buffers for performance.** TBufferBase is designed to implement the fast-path "memcpy" style* operations that work in the common case. It does so with small and* (eventually) nonvirtual, inlinable methods. TBufferBase is an abstract* class. Subclasses are expected to define the "slow path" operations* that have to be done when the buffers are full or empty.**/class TBufferBase : public TTransport {public:/*** Fast-path read.** When we have enough data buffered to fulfill the read, we can satisfy it* with a single memcpy, then adjust our internal pointers. If the buffer* is empty, we call out to our slow path, implemented by a subclass.* This method is meant to eventually be nonvirtual and inlinable.*/uint32_t read(uint8_t* buf, uint32_t len) {uint8_t* new_rBase = rBase_ + len;if (TDB_LIKELY(new_rBase <= rBound_)) {std::memcpy(buf, rBase_, len);rBase_ = new_rBase;return len;}return readSlow(buf, len);}/*** Fast-path write.** When we have enough empty space in our buffer to accomodate the write, we* can satisfy it with a single memcpy, then adjust our internal pointers.* If the buffer is full, we call out to our slow path, implemented by a* subclass. This method is meant to eventually be nonvirtual and* inlinable.*/void write(const uint8_t* buf, uint32_t len) {uint8_t* new_wBase = wBase_ + len;if (TDB_LIKELY(new_wBase <= wBound_)) {std::memcpy(wBase_, buf, len);wBase_ = new_wBase;return;}writeSlow(buf, len);}/*** Fast-path borrow. A lot like the fast-path read.*/const uint8_t* borrow(uint8_t* buf, uint32_t* len) {if (TDB_LIKELY(static_cast<ptrdiff_t>(*len) <= rBound_ - rBase_)) {// With strict aliasing, writing to len shouldn't force us to// refetch rBase_ from memory. TODO(dreiss): Verify this.*len = rBound_ - rBase_;return rBase_;}return borrowSlow(buf, len);}/*** Consume doesn't require a slow path.*/void consume(uint32_t len) {if (TDB_LIKELY(static_cast<ptrdiff_t>(len) <= rBound_ - rBase_)) {rBase_ += len;} else {throw TTransportException(TTransportException::BAD_ARGS,"consume did not follow a borrow.");}}protected:/// Slow path read.virtual uint32_t readSlow(uint8_t* buf, uint32_t len) = 0;/// Slow path write.virtual void writeSlow(const uint8_t* buf, uint32_t len) = 0;/*** Slow path borrow.** POSTCONDITION: return == NULL || rBound_ - rBase_ >= *len*/virtual const uint8_t* borrowSlow(uint8_t* buf, uint32_t* len) = 0;/*** Trivial constructor.** Initialize pointers safely. Constructing is not a very* performance-sensitive operation, so it is okay to just leave it to* the concrete class to set up pointers correctly.*/TBufferBase(): rBase_(NULL), rBound_(NULL), wBase_(NULL), wBound_(NULL){}/// Convenience mutator for setting the read buffer.void setReadBuffer(uint8_t* buf, uint32_t len) {rBase_ = buf;rBound_ = buf+len;}/// Convenience mutator for setting the write buffer.void setWriteBuffer(uint8_t* buf, uint32_t len) {wBase_ = buf;wBound_ = buf+len;}virtual ~TBufferBase() {}/// Reads begin here.uint8_t* rBase_;/// Reads may extend to just before here.uint8_t* rBound_;/// Writes begin here.uint8_t* wBase_;/// Writes may extend to just before here.uint8_t* wBound_;};/*** Base class for all transport which wraps transport to new one.*/class TUnderlyingTransport : public TBufferBase {public:static const int DEFAULT_BUFFER_SIZE = 512;virtual bool peek() {return (rBase_ < rBound_) || transport_->peek();}void open() {transport_->open();}bool isOpen() {return transport_->isOpen();}void close() {flush();transport_->close();}boost::shared_ptr<TTransport> getUnderlyingTransport() {return transport_;}protected:boost::shared_ptr<TTransport> transport_;uint32_t rBufSize_;uint32_t wBufSize_;boost::scoped_array<uint8_t> rBuf_;boost::scoped_array<uint8_t> wBuf_;TUnderlyingTransport(boost::shared_ptr<TTransport> transport, uint32_t sz): transport_(transport), rBufSize_(sz), wBufSize_(sz), rBuf_(new uint8_t[rBufSize_]), wBuf_(new uint8_t[wBufSize_]) {}TUnderlyingTransport(boost::shared_ptr<TTransport> transport): transport_(transport), rBufSize_(DEFAULT_BUFFER_SIZE), wBufSize_(DEFAULT_BUFFER_SIZE), rBuf_(new uint8_t[rBufSize_]), wBuf_(new uint8_t[wBufSize_]) {}TUnderlyingTransport(boost::shared_ptr<TTransport> transport, uint32_t rsz, uint32_t wsz): transport_(transport), rBufSize_(rsz), wBufSize_(wsz), rBuf_(new uint8_t[rBufSize_]), wBuf_(new uint8_t[wBufSize_]) {}};/*** Buffered transport. For reads it will read more data than is requested* and will serve future data out of a local buffer. For writes, data is* stored to an in memory buffer before being written out.**/class TBufferedTransport : public TUnderlyingTransport {public:/// Use default buffer sizes.TBufferedTransport(boost::shared_ptr<TTransport> transport): TUnderlyingTransport(transport){initPointers();}/// Use specified buffer sizes.TBufferedTransport(boost::shared_ptr<TTransport> transport, uint32_t sz): TUnderlyingTransport(transport, sz){initPointers();}/// Use specified read and write buffer sizes.TBufferedTransport(boost::shared_ptr<TTransport> transport, uint32_t rsz, uint32_t wsz): TUnderlyingTransport(transport, rsz, wsz){initPointers();}virtual bool peek() {/* shigin: see THRIFT-96 discussion */if (rBase_ == rBound_) {setReadBuffer(rBuf_.get(), transport_->read(rBuf_.get(), rBufSize_));}return (rBound_ > rBase_);}virtual uint32_t readSlow(uint8_t* buf, uint32_t len);virtual void writeSlow(const uint8_t* buf, uint32_t len);void flush();/*** The following behavior is currently implemented by TBufferedTransport,* but that may change in a future version:* 1/ If len is at most rBufSize_, borrow will never return NULL.* Depending on the underlying transport, it could throw an exception* or hang forever.* 2/ Some borrow requests may copy bytes internally. However,* if len is at most rBufSize_/2, none of the copied bytes* will ever have to be copied again. For optimial performance,* stay under this limit.*/virtual const uint8_t* borrowSlow(uint8_t* buf, uint32_t* len);protected:void initPointers() {setReadBuffer(rBuf_.get(), 0);setWriteBuffer(wBuf_.get(), wBufSize_);// Write size never changes.}};/*** Wraps a transport into a buffered one.**/class TBufferedTransportFactory : public TTransportFactory {public:TBufferedTransportFactory() {}virtual ~TBufferedTransportFactory() {}/*** Wraps the transport into a buffered one.*/virtual boost::shared_ptr<TTransport> getTransport(boost::shared_ptr<TTransport> trans) {return boost::shared_ptr<TTransport>(new TBufferedTransport(trans));}};/*** Framed transport. All writes go into an in-memory buffer until flush is* called, at which point the transport writes the length of the entire* binary chunk followed by the data payload. This allows the receiver on the* other end to always do fixed-length reads.**/class TFramedTransport : public TUnderlyingTransport {public:/// Use default buffer sizes.TFramedTransport(boost::shared_ptr<TTransport> transport): TUnderlyingTransport(transport){initPointers();}TFramedTransport(boost::shared_ptr<TTransport> transport, uint32_t sz): TUnderlyingTransport(transport, sz){initPointers();}virtual uint32_t readSlow(uint8_t* buf, uint32_t len);virtual void writeSlow(const uint8_t* buf, uint32_t len);virtual void flush();const uint8_t* borrowSlow(uint8_t* buf, uint32_t* len);protected:/*** Reads a frame of input from the underlying stream.*/void readFrame();void initPointers() {setReadBuffer(NULL, 0);setWriteBuffer(wBuf_.get(), wBufSize_);// Pad the buffer so we can insert the size later.int32_t pad = 0;this->write((uint8_t*)&pad, sizeof(pad));}};/*** Wraps a transport into a framed one.**/class TFramedTransportFactory : public TTransportFactory {public:TFramedTransportFactory() {}virtual ~TFramedTransportFactory() {}/*** Wraps the transport into a framed one.*/virtual boost::shared_ptr<TTransport> getTransport(boost::shared_ptr<TTransport> trans) {return boost::shared_ptr<TTransport>(new TFramedTransport(trans));}};/*** A memory buffer is a tranpsort that simply reads from and writes to an* in memory buffer. Anytime you call write on it, the data is simply placed* into a buffer, and anytime you call read, data is read from that buffer.** The buffers are allocated using C constructs malloc,realloc, and the size* doubles as necessary. We've considered using scoped**/class TMemoryBuffer : public TBufferBase {private:// Common initialization done by all constructors.void initCommon(uint8_t* buf, uint32_t size, bool owner, uint32_t wPos) {if (buf == NULL && size != 0) {assert(owner);buf = (uint8_t*)std::malloc(size);if (buf == NULL) {throw TTransportException("Out of memory");}}buffer_ = buf;bufferSize_ = size;rBase_ = buffer_;rBound_ = buffer_ + wPos;// TODO(dreiss): Investigate NULL-ing this if !owner.wBase_ = buffer_ + wPos;wBound_ = buffer_ + bufferSize_;owner_ = owner;// rBound_ is really an artifact. In principle, it should always be// equal to wBase_. We update it in a few places (computeRead, etc.).}public:static const uint32_t defaultSize = 1024;/*** This enum specifies how a TMemoryBuffer should treat* memory passed to it via constructors or resetBuffer.** OBSERVE:* TMemoryBuffer will simply store a pointer to the memory.* It is the callers responsibility to ensure that the pointer* remains valid for the lifetime of the TMemoryBuffer,* and that it is properly cleaned up.* Note that no data can be written to observed buffers.** COPY:* TMemoryBuffer will make an internal copy of the buffer.* The caller has no responsibilities.** TAKE_OWNERSHIP:* TMemoryBuffer will become the "owner" of the buffer,* and will be responsible for freeing it.* The membory must have been allocated with malloc.*/enum MemoryPolicy{ OBSERVE = 1, COPY = 2, TAKE_OWNERSHIP = 3};/*** Construct a TMemoryBuffer with a default-sized buffer,* owned by the TMemoryBuffer object.*/TMemoryBuffer() {initCommon(NULL, defaultSize, true, 0);}/*** Construct a TMemoryBuffer with a buffer of a specified size,* owned by the TMemoryBuffer object.** @param sz The initial size of the buffer.*/TMemoryBuffer(uint32_t sz) {initCommon(NULL, sz, true, 0);}/*** Construct a TMemoryBuffer with buf as its initial contents.** @param buf The initial contents of the buffer.* Note that, while buf is a non-const pointer,* TMemoryBuffer will not write to it if policy == OBSERVE,* so it is safe to const_cast<uint8_t*>(whatever).* @param sz The size of @c buf.* @param policy See @link MemoryPolicy @endlink .*/TMemoryBuffer(uint8_t* buf, uint32_t sz, MemoryPolicy policy = OBSERVE) {if (buf == NULL && sz != 0) {throw TTransportException(TTransportException::BAD_ARGS,"TMemoryBuffer given null buffer with non-zero size.");}switch (policy) {case OBSERVE:case TAKE_OWNERSHIP:initCommon(buf, sz, policy == TAKE_OWNERSHIP, sz);break;case COPY:initCommon(NULL, sz, true, 0);this->write(buf, sz);break;default:throw TTransportException(TTransportException::BAD_ARGS,"Invalid MemoryPolicy for TMemoryBuffer");}}~TMemoryBuffer() {if (owner_) {std::free(buffer_);}}bool isOpen() {return true;}bool peek() {return (rBase_ < wBase_);}void open() {}void close() {}// TODO(dreiss): Make bufPtr const.void getBuffer(uint8_t** bufPtr, uint32_t* sz) {*bufPtr = rBase_;*sz = wBase_ - rBase_;}std::string getBufferAsString() {if (buffer_ == NULL) {return "";}uint8_t* buf;uint32_t sz;getBuffer(&buf, &sz);return std::string((char*)buf, (std::string::size_type)sz);}void appendBufferToString(std::string& str) {if (buffer_ == NULL) {return;}uint8_t* buf;uint32_t sz;getBuffer(&buf, &sz);str.append((char*)buf, sz);}void resetBuffer(bool reset_capacity = false) {if (reset_capacity){assert(owner_);void* new_buffer = std::realloc(buffer_, defaultSize);if (new_buffer == NULL) {throw TTransportException("Out of memory.");}buffer_ = (uint8_t*) new_buffer;bufferSize_ = defaultSize;wBound_ = buffer_ + bufferSize_;}rBase_ = buffer_;rBound_ = buffer_;wBase_ = buffer_;// It isn't safe to write into a buffer we don't own.if (!owner_) {wBound_ = wBase_;bufferSize_ = 0;}}/// See constructor documentation.void resetBuffer(uint8_t* buf, uint32_t sz, MemoryPolicy policy = OBSERVE) {// Use a variant of the copy-and-swap trick for assignment operators.// This is sub-optimal in terms of performance for two reasons:// 1/ The constructing and swapping of the (small) values// in the temporary object takes some time, and is not necessary.// 2/ If policy == COPY, we allocate the new buffer before// freeing the old one, precluding the possibility of// reusing that memory.// I doubt that either of these problems could be optimized away,// but the second is probably no a common case, and the first is minor.// I don't expect resetBuffer to be a common operation, so I'm willing to// bite the performance bullet to make the method this simple.// Construct the new buffer.TMemoryBuffer new_buffer(buf, sz, policy);// Move it into ourself.this->swap(new_buffer);// Our old self gets destroyed.}std::string readAsString(uint32_t len) {std::string str;(void)readAppendToString(str, len);return str;}uint32_t readAppendToString(std::string& str, uint32_t len);void readEnd() {if (rBase_ == wBase_) {resetBuffer();}}uint32_t available_read() const {// Remember, wBase_ is the real rBound_.return wBase_ - rBase_;}uint32_t available_write() const {return wBound_ - wBase_;}// Returns a pointer to where the client can write data to append to// the TMemoryBuffer, and ensures the buffer is big enough to accomodate a// write of the provided length. The returned pointer is very convenient for// passing to read(), recv(), or similar. You must call wroteBytes() as soon// as data is written or the buffer will not be aware that data has changed.uint8_t* getWritePtr(uint32_t len) {ensureCanWrite(len);return wBase_;}// Informs the buffer that the client has written 'len' bytes into storage// that had been provided by getWritePtr().void wroteBytes(uint32_t len);protected:void swap(TMemoryBuffer& that) {using std::swap;swap(buffer_, that.buffer_);swap(bufferSize_, that.bufferSize_);swap(rBase_, that.rBase_);swap(rBound_, that.rBound_);swap(wBase_, that.wBase_);swap(wBound_, that.wBound_);swap(owner_, that.owner_);}// Make sure there's at least 'len' bytes available for writing.void ensureCanWrite(uint32_t len);// Compute the position and available data for reading.void computeRead(uint32_t len, uint8_t** out_start, uint32_t* out_give);uint32_t readSlow(uint8_t* buf, uint32_t len);void writeSlow(const uint8_t* buf, uint32_t len);const uint8_t* borrowSlow(uint8_t* buf, uint32_t* len);// Data bufferuint8_t* buffer_;// Allocated buffer sizeuint32_t bufferSize_;// Is this object the owner of the buffer?bool owner_;// Don't forget to update constrctors, initCommon, and swap if// you add new members.};}}} // apache::thrift::transport#endif // #ifndef _THRIFT_TRANSPORT_TBUFFERTRANSPORTS_H_