Subversion Repositories SmartDukaan

Rev

Rev 30 | Blame | Compare with Previous | Last modification | View Log | RSS feed

/*
 * Licensed to the Apache Software Foundation (ASF) under one
 * or more contributor license agreements. See the NOTICE file
 * distributed with this work for additional information
 * regarding copyright ownership. The ASF licenses this file
 * to you under the Apache License, Version 2.0 (the
 * "License"); you may not use this file except in compliance
 * with the License. You may obtain a copy of the License at
 *
 *   http://www.apache.org/licenses/LICENSE-2.0
 *
 * Unless required by applicable law or agreed to in writing,
 * software distributed under the License is distributed on an
 * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
 * KIND, either express or implied. See the License for the
 * specific language governing permissions and limitations
 * under the License.
 */

#include <cassert>
#include <algorithm>

#include <transport/TBufferTransports.h>

using std::string;

namespace apache { namespace thrift { namespace transport {


uint32_t TBufferedTransport::readSlow(uint8_t* buf, uint32_t len) {
  uint32_t want = len;
  uint32_t have = rBound_ - rBase_;

  // We should only take the slow path if we can't satisfy the read
  // with the data already in the buffer.
  assert(have < want);

  // Copy out whatever we have.
  if (have > 0) {
    memcpy(buf, rBase_, have);
    want -= have;
    buf += have;
  }
  // Get more from underlying transport up to buffer size.
  // Note that this makes a lot of sense if len < rBufSize_
  // and almost no sense otherwise.  TODO(dreiss): Fix that
  // case (possibly including some readv hotness).
  setReadBuffer(rBuf_.get(), transport_->read(rBuf_.get(), rBufSize_));

  // Hand over whatever we have.
  uint32_t give = std::min(want, static_cast<uint32_t>(rBound_ - rBase_));
  memcpy(buf, rBase_, give);
  rBase_ += give;
  want -= give;

  return (len - want);
}

void TBufferedTransport::writeSlow(const uint8_t* buf, uint32_t len) {
  uint32_t have_bytes = wBase_ - wBuf_.get();
  uint32_t space = wBound_ - wBase_;
  // We should only take the slow path if we can't accomodate the write
  // with the free space already in the buffer.
  assert(wBound_ - wBase_ < static_cast<ptrdiff_t>(len));

  // Now here's the tricky question: should we copy data from buf into our
  // internal buffer and write it from there, or should we just write out
  // the current internal buffer in one syscall and write out buf in another.
  // If our currently buffered data plus buf is at least double our buffer
  // size, we will have to do two syscalls no matter what (except in the
  // degenerate case when our buffer is empty), so there is no use copying.
  // Otherwise, there is sort of a sliding scale.  If we have N-1 bytes
  // buffered and need to write 2, it would be crazy to do two syscalls.
  // On the other hand, if we have 2 bytes buffered and are writing 2N-3,
  // we can save a syscall in the short term by loading up our buffer, writing
  // it out, and copying the rest of the bytes into our buffer.  Of course,
  // if we get another 2-byte write, we haven't saved any syscalls at all,
  // and have just copied nearly 2N bytes for nothing.  Finding a perfect
  // policy would require predicting the size of future writes, so we're just
  // going to always eschew syscalls if we have less than 2N bytes to write.

  // The case where we have to do two syscalls.
  // This case also covers the case where the buffer is empty,
  // but it is clearer (I think) to think of it as two separate cases.
  if ((have_bytes + len >= 2*wBufSize_) || (have_bytes == 0)) {
    // TODO(dreiss): writev
    if (have_bytes > 0) {
      transport_->write(wBuf_.get(), have_bytes);
    }
    transport_->write(buf, len);
    wBase_ = wBuf_.get();
    return;
  }

  // Fill up our internal buffer for a write.
  memcpy(wBase_, buf, space);
  buf += space;
  len -= space;
  transport_->write(wBuf_.get(), wBufSize_);

  // Copy the rest into our buffer.
  assert(len < wBufSize_);
  memcpy(wBuf_.get(), buf, len);
  wBase_ = wBuf_.get() + len;
  return;
}

const uint8_t* TBufferedTransport::borrowSlow(uint8_t* buf, uint32_t* len) {
  // If the request is bigger than our buffer, we are hosed.
  if (*len > rBufSize_) {
    return NULL;
  }

  // The number of bytes of data we have already.
  uint32_t have = rBound_ - rBase_;
  // The number of additional bytes we need from the underlying transport.
  int32_t need = *len - have;
  // The space from the start of the buffer to the end of our data.
  uint32_t offset = rBound_ - rBuf_.get();
  assert(need > 0);

  // If we have less than half our buffer space available, shift the data
  // we have down to the start.  If the borrow is big compared to our buffer,
  // this could be kind of a waste, but if the borrow is small, it frees up
  // space at the end of our buffer to do a bigger single read from the
  // underlying transport.  Also, if our needs extend past the end of the
  // buffer, we have to do a copy no matter what.
  if ((offset > rBufSize_/2) || (offset + need > rBufSize_)) {
    memmove(rBuf_.get(), rBase_, have);
    setReadBuffer(rBuf_.get(), have);
  }

  // First try to fill up the buffer.
  uint32_t got = transport_->read(rBound_, rBufSize_ - have);
  rBound_ += got;
  need -= got;

  // If that fails, readAll until we get what we need.
  if (need > 0) {
    rBound_ += transport_->readAll(rBound_, need);
  }

  *len = rBound_ - rBase_;
  return rBase_;
}

void TBufferedTransport::flush()  {
  // Write out any data waiting in the write buffer.
  uint32_t have_bytes = wBase_ - wBuf_.get();
  if (have_bytes > 0) {
    // Note that we reset wBase_ prior to the underlying write
    // to ensure we're in a sane state (i.e. internal buffer cleaned)
    // if the underlying write throws up an exception
    wBase_ = wBuf_.get();
    transport_->write(wBuf_.get(), have_bytes);
  }

  // Flush the underlying transport.
  transport_->flush();
}


uint32_t TFramedTransport::readSlow(uint8_t* buf, uint32_t len) {
  uint32_t want = len;
  uint32_t have = rBound_ - rBase_;

  // We should only take the slow path if we can't satisfy the read
  // with the data already in the buffer.
  assert(have < want);

  // Copy out whatever we have.
  if (have > 0) {
    memcpy(buf, rBase_, have);
    want -= have;
    buf += have;
  }

  // Read another frame.
  readFrame();

  // TODO(dreiss): Should we warn when reads cross frames?

  // Hand over whatever we have.
  uint32_t give = std::min(want, static_cast<uint32_t>(rBound_ - rBase_));
  memcpy(buf, rBase_, give);
  rBase_ += give;
  want -= give;

  return (len - want);
}

void TFramedTransport::readFrame() {
  // TODO(dreiss): Think about using readv here, even though it would
  // result in (gasp) read-ahead.

  // Read the size of the next frame.
  int32_t sz;
  transport_->readAll((uint8_t*)&sz, sizeof(sz));
  sz = ntohl(sz);

  if (sz < 0) {
    throw TTransportException("Frame size has negative value");
  }

  // Read the frame payload, and reset markers.
  if (sz > static_cast<int32_t>(rBufSize_)) {
    rBuf_.reset(new uint8_t[sz]);
    rBufSize_ = sz;
  }
  transport_->readAll(rBuf_.get(), sz);
  setReadBuffer(rBuf_.get(), sz);
}

void TFramedTransport::writeSlow(const uint8_t* buf, uint32_t len) {
  // Double buffer size until sufficient.
  uint32_t have = wBase_ - wBuf_.get();
  while (wBufSize_ < len + have) {
    wBufSize_ *= 2;
  }

  // TODO(dreiss): Consider modifying this class to use malloc/free
  // so we can use realloc here.

  // Allocate new buffer.
  uint8_t* new_buf = new uint8_t[wBufSize_];

  // Copy the old buffer to the new one.
  memcpy(new_buf, wBuf_.get(), have);

  // Now point buf to the new one.
  wBuf_.reset(new_buf);
  wBase_ = wBuf_.get() + have;
  wBound_ = wBuf_.get() + wBufSize_;

  // Copy the data into the new buffer.
  memcpy(wBase_, buf, len);
  wBase_ += len;
}

void TFramedTransport::flush()  {
  int32_t sz_hbo, sz_nbo;
  assert(wBufSize_ > sizeof(sz_nbo));

  // Slip the frame size into the start of the buffer.
  sz_hbo = wBase_ - (wBuf_.get() + sizeof(sz_nbo));
  sz_nbo = (int32_t)htonl((uint32_t)(sz_hbo));
  memcpy(wBuf_.get(), (uint8_t*)&sz_nbo, sizeof(sz_nbo));

  if (sz_hbo > 0) {
    // Note that we reset wBase_ (with a pad for the frame size)
    // prior to the underlying write to ensure we're in a sane state
    // (i.e. internal buffer cleaned) if the underlying write throws
    // up an exception
    wBase_ = wBuf_.get() + sizeof(sz_nbo);

    // Write size and frame body.
    transport_->write(wBuf_.get(), sizeof(sz_nbo)+sz_hbo);
  }

  // Flush the underlying transport.
  transport_->flush();
}

const uint8_t* TFramedTransport::borrowSlow(uint8_t* buf, uint32_t* len) {
  // Don't try to be clever with shifting buffers.
  // If the fast path failed let the protocol use its slow path.
  // Besides, who is going to try to borrow across messages?
  return NULL;
}


void TMemoryBuffer::computeRead(uint32_t len, uint8_t** out_start, uint32_t* out_give) {
  // Correct rBound_ so we can use the fast path in the future.
  rBound_ = wBase_;

  // Decide how much to give.
  uint32_t give = std::min(len, available_read());

  *out_start = rBase_;
  *out_give = give;

  // Preincrement rBase_ so the caller doesn't have to.
  rBase_ += give;
}

uint32_t TMemoryBuffer::readSlow(uint8_t* buf, uint32_t len) {
  uint8_t* start;
  uint32_t give;
  computeRead(len, &start, &give);

  // Copy into the provided buffer.
  memcpy(buf, start, give);

  return give;
}

uint32_t TMemoryBuffer::readAppendToString(std::string& str, uint32_t len) {
  // Don't get some stupid assertion failure.
  if (buffer_ == NULL) {
    return 0;
  }

  uint8_t* start;
  uint32_t give;
  computeRead(len, &start, &give);

  // Append to the provided string.
  str.append((char*)start, give);

  return give;
}

void TMemoryBuffer::ensureCanWrite(uint32_t len) {
  // Check available space
  uint32_t avail = available_write();
  if (len <= avail) {
    return;
  }

  if (!owner_) {
    throw TTransportException("Insufficient space in external MemoryBuffer");
  }

  // Grow the buffer as necessary.
  while (len > avail) {
    bufferSize_ *= 2;
    wBound_ = buffer_ + bufferSize_;
    avail = available_write();
  }

  // Allocate into a new pointer so we don't bork ours if it fails.
  void* new_buffer = std::realloc(buffer_, bufferSize_);
  if (new_buffer == NULL) {
    throw TTransportException("Out of memory.");
  }

  ptrdiff_t offset = (uint8_t*)new_buffer - buffer_;
  buffer_ += offset;
  rBase_ += offset;
  rBound_ += offset;
  wBase_ += offset;
  wBound_ += offset;
}

void TMemoryBuffer::writeSlow(const uint8_t* buf, uint32_t len) {
  ensureCanWrite(len);

  // Copy into the buffer and increment wBase_.
  memcpy(wBase_, buf, len);
  wBase_ += len;
}

void TMemoryBuffer::wroteBytes(uint32_t len) {
  uint32_t avail = available_write();
  if (len > avail) {
    throw TTransportException("Client wrote more bytes than size of buffer.");
  }
  wBase_ += len;
}

const uint8_t* TMemoryBuffer::borrowSlow(uint8_t* buf, uint32_t* len) {
  rBound_ = wBase_;
  if (available_read() >= *len) {
    *len = available_read();
    return rBase_;
  }
  return NULL;
}

}}} // apache::thrift::transport