Subversion Repositories SmartDukaan

Rev

Rev 30 | Details | Compare with Previous | Last modification | View Log | RSS feed

Rev Author Line No. Line
30 ashish 1
/*
2
 * Licensed to the Apache Software Foundation (ASF) under one
3
 * or more contributor license agreements. See the NOTICE file
4
 * distributed with this work for additional information
5
 * regarding copyright ownership. The ASF licenses this file
6
 * to you under the Apache License, Version 2.0 (the
7
 * "License"); you may not use this file except in compliance
8
 * with the License. You may obtain a copy of the License at
9
 *
10
 *   http://www.apache.org/licenses/LICENSE-2.0
11
 *
12
 * Unless required by applicable law or agreed to in writing,
13
 * software distributed under the License is distributed on an
14
 * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
15
 * KIND, either express or implied. See the License for the
16
 * specific language governing permissions and limitations
17
 * under the License.
18
 */
19
 
20
#include <cassert>
21
#include <algorithm>
22
 
23
#include <transport/TBufferTransports.h>
24
 
25
using std::string;
26
 
27
namespace apache { namespace thrift { namespace transport {
28
 
29
 
30
uint32_t TBufferedTransport::readSlow(uint8_t* buf, uint32_t len) {
31
  uint32_t want = len;
32
  uint32_t have = rBound_ - rBase_;
33
 
34
  // We should only take the slow path if we can't satisfy the read
35
  // with the data already in the buffer.
36
  assert(have < want);
37
 
38
  // Copy out whatever we have.
39
  if (have > 0) {
40
    memcpy(buf, rBase_, have);
41
    want -= have;
42
    buf += have;
43
  }
44
  // Get more from underlying transport up to buffer size.
45
  // Note that this makes a lot of sense if len < rBufSize_
46
  // and almost no sense otherwise.  TODO(dreiss): Fix that
47
  // case (possibly including some readv hotness).
48
  setReadBuffer(rBuf_.get(), transport_->read(rBuf_.get(), rBufSize_));
49
 
50
  // Hand over whatever we have.
51
  uint32_t give = std::min(want, static_cast<uint32_t>(rBound_ - rBase_));
52
  memcpy(buf, rBase_, give);
53
  rBase_ += give;
54
  want -= give;
55
 
56
  return (len - want);
57
}
58
 
59
void TBufferedTransport::writeSlow(const uint8_t* buf, uint32_t len) {
60
  uint32_t have_bytes = wBase_ - wBuf_.get();
61
  uint32_t space = wBound_ - wBase_;
62
  // We should only take the slow path if we can't accomodate the write
63
  // with the free space already in the buffer.
64
  assert(wBound_ - wBase_ < static_cast<ptrdiff_t>(len));
65
 
66
  // Now here's the tricky question: should we copy data from buf into our
67
  // internal buffer and write it from there, or should we just write out
68
  // the current internal buffer in one syscall and write out buf in another.
69
  // If our currently buffered data plus buf is at least double our buffer
70
  // size, we will have to do two syscalls no matter what (except in the
71
  // degenerate case when our buffer is empty), so there is no use copying.
72
  // Otherwise, there is sort of a sliding scale.  If we have N-1 bytes
73
  // buffered and need to write 2, it would be crazy to do two syscalls.
74
  // On the other hand, if we have 2 bytes buffered and are writing 2N-3,
75
  // we can save a syscall in the short term by loading up our buffer, writing
76
  // it out, and copying the rest of the bytes into our buffer.  Of course,
77
  // if we get another 2-byte write, we haven't saved any syscalls at all,
78
  // and have just copied nearly 2N bytes for nothing.  Finding a perfect
79
  // policy would require predicting the size of future writes, so we're just
80
  // going to always eschew syscalls if we have less than 2N bytes to write.
81
 
82
  // The case where we have to do two syscalls.
83
  // This case also covers the case where the buffer is empty,
84
  // but it is clearer (I think) to think of it as two separate cases.
85
  if ((have_bytes + len >= 2*wBufSize_) || (have_bytes == 0)) {
86
    // TODO(dreiss): writev
87
    if (have_bytes > 0) {
88
      transport_->write(wBuf_.get(), have_bytes);
89
    }
90
    transport_->write(buf, len);
91
    wBase_ = wBuf_.get();
92
    return;
93
  }
94
 
95
  // Fill up our internal buffer for a write.
96
  memcpy(wBase_, buf, space);
97
  buf += space;
98
  len -= space;
99
  transport_->write(wBuf_.get(), wBufSize_);
100
 
101
  // Copy the rest into our buffer.
102
  assert(len < wBufSize_);
103
  memcpy(wBuf_.get(), buf, len);
104
  wBase_ = wBuf_.get() + len;
105
  return;
106
}
107
 
108
const uint8_t* TBufferedTransport::borrowSlow(uint8_t* buf, uint32_t* len) {
109
  // If the request is bigger than our buffer, we are hosed.
110
  if (*len > rBufSize_) {
111
    return NULL;
112
  }
113
 
114
  // The number of bytes of data we have already.
115
  uint32_t have = rBound_ - rBase_;
116
  // The number of additional bytes we need from the underlying transport.
117
  int32_t need = *len - have;
118
  // The space from the start of the buffer to the end of our data.
119
  uint32_t offset = rBound_ - rBuf_.get();
120
  assert(need > 0);
121
 
122
  // If we have less than half our buffer space available, shift the data
123
  // we have down to the start.  If the borrow is big compared to our buffer,
124
  // this could be kind of a waste, but if the borrow is small, it frees up
125
  // space at the end of our buffer to do a bigger single read from the
126
  // underlying transport.  Also, if our needs extend past the end of the
127
  // buffer, we have to do a copy no matter what.
128
  if ((offset > rBufSize_/2) || (offset + need > rBufSize_)) {
129
    memmove(rBuf_.get(), rBase_, have);
130
    setReadBuffer(rBuf_.get(), have);
131
  }
132
 
133
  // First try to fill up the buffer.
134
  uint32_t got = transport_->read(rBound_, rBufSize_ - have);
135
  rBound_ += got;
136
  need -= got;
137
 
138
  // If that fails, readAll until we get what we need.
139
  if (need > 0) {
140
    rBound_ += transport_->readAll(rBound_, need);
141
  }
142
 
143
  *len = rBound_ - rBase_;
144
  return rBase_;
145
}
146
 
147
void TBufferedTransport::flush()  {
148
  // Write out any data waiting in the write buffer.
149
  uint32_t have_bytes = wBase_ - wBuf_.get();
150
  if (have_bytes > 0) {
151
    // Note that we reset wBase_ prior to the underlying write
152
    // to ensure we're in a sane state (i.e. internal buffer cleaned)
153
    // if the underlying write throws up an exception
154
    wBase_ = wBuf_.get();
155
    transport_->write(wBuf_.get(), have_bytes);
156
  }
157
 
158
  // Flush the underlying transport.
159
  transport_->flush();
160
}
161
 
162
 
163
uint32_t TFramedTransport::readSlow(uint8_t* buf, uint32_t len) {
164
  uint32_t want = len;
165
  uint32_t have = rBound_ - rBase_;
166
 
167
  // We should only take the slow path if we can't satisfy the read
168
  // with the data already in the buffer.
169
  assert(have < want);
170
 
171
  // Copy out whatever we have.
172
  if (have > 0) {
173
    memcpy(buf, rBase_, have);
174
    want -= have;
175
    buf += have;
176
  }
177
 
178
  // Read another frame.
179
  readFrame();
180
 
181
  // TODO(dreiss): Should we warn when reads cross frames?
182
 
183
  // Hand over whatever we have.
184
  uint32_t give = std::min(want, static_cast<uint32_t>(rBound_ - rBase_));
185
  memcpy(buf, rBase_, give);
186
  rBase_ += give;
187
  want -= give;
188
 
189
  return (len - want);
190
}
191
 
192
void TFramedTransport::readFrame() {
193
  // TODO(dreiss): Think about using readv here, even though it would
194
  // result in (gasp) read-ahead.
195
 
196
  // Read the size of the next frame.
197
  int32_t sz;
198
  transport_->readAll((uint8_t*)&sz, sizeof(sz));
199
  sz = ntohl(sz);
200
 
201
  if (sz < 0) {
202
    throw TTransportException("Frame size has negative value");
203
  }
204
 
205
  // Read the frame payload, and reset markers.
206
  if (sz > static_cast<int32_t>(rBufSize_)) {
207
    rBuf_.reset(new uint8_t[sz]);
208
    rBufSize_ = sz;
209
  }
210
  transport_->readAll(rBuf_.get(), sz);
211
  setReadBuffer(rBuf_.get(), sz);
212
}
213
 
214
void TFramedTransport::writeSlow(const uint8_t* buf, uint32_t len) {
215
  // Double buffer size until sufficient.
216
  uint32_t have = wBase_ - wBuf_.get();
217
  while (wBufSize_ < len + have) {
218
    wBufSize_ *= 2;
219
  }
220
 
221
  // TODO(dreiss): Consider modifying this class to use malloc/free
222
  // so we can use realloc here.
223
 
224
  // Allocate new buffer.
225
  uint8_t* new_buf = new uint8_t[wBufSize_];
226
 
227
  // Copy the old buffer to the new one.
228
  memcpy(new_buf, wBuf_.get(), have);
229
 
230
  // Now point buf to the new one.
231
  wBuf_.reset(new_buf);
232
  wBase_ = wBuf_.get() + have;
233
  wBound_ = wBuf_.get() + wBufSize_;
234
 
235
  // Copy the data into the new buffer.
236
  memcpy(wBase_, buf, len);
237
  wBase_ += len;
238
}
239
 
240
void TFramedTransport::flush()  {
241
  int32_t sz_hbo, sz_nbo;
242
  assert(wBufSize_ > sizeof(sz_nbo));
243
 
244
  // Slip the frame size into the start of the buffer.
245
  sz_hbo = wBase_ - (wBuf_.get() + sizeof(sz_nbo));
246
  sz_nbo = (int32_t)htonl((uint32_t)(sz_hbo));
247
  memcpy(wBuf_.get(), (uint8_t*)&sz_nbo, sizeof(sz_nbo));
248
 
249
  if (sz_hbo > 0) {
250
    // Note that we reset wBase_ (with a pad for the frame size)
251
    // prior to the underlying write to ensure we're in a sane state
252
    // (i.e. internal buffer cleaned) if the underlying write throws
253
    // up an exception
254
    wBase_ = wBuf_.get() + sizeof(sz_nbo);
255
 
256
    // Write size and frame body.
257
    transport_->write(wBuf_.get(), sizeof(sz_nbo)+sz_hbo);
258
  }
259
 
260
  // Flush the underlying transport.
261
  transport_->flush();
262
}
263
 
264
const uint8_t* TFramedTransport::borrowSlow(uint8_t* buf, uint32_t* len) {
265
  // Don't try to be clever with shifting buffers.
266
  // If the fast path failed let the protocol use its slow path.
267
  // Besides, who is going to try to borrow across messages?
268
  return NULL;
269
}
270
 
271
 
272
void TMemoryBuffer::computeRead(uint32_t len, uint8_t** out_start, uint32_t* out_give) {
273
  // Correct rBound_ so we can use the fast path in the future.
274
  rBound_ = wBase_;
275
 
276
  // Decide how much to give.
277
  uint32_t give = std::min(len, available_read());
278
 
279
  *out_start = rBase_;
280
  *out_give = give;
281
 
282
  // Preincrement rBase_ so the caller doesn't have to.
283
  rBase_ += give;
284
}
285
 
286
uint32_t TMemoryBuffer::readSlow(uint8_t* buf, uint32_t len) {
287
  uint8_t* start;
288
  uint32_t give;
289
  computeRead(len, &start, &give);
290
 
291
  // Copy into the provided buffer.
292
  memcpy(buf, start, give);
293
 
294
  return give;
295
}
296
 
297
uint32_t TMemoryBuffer::readAppendToString(std::string& str, uint32_t len) {
298
  // Don't get some stupid assertion failure.
299
  if (buffer_ == NULL) {
300
    return 0;
301
  }
302
 
303
  uint8_t* start;
304
  uint32_t give;
305
  computeRead(len, &start, &give);
306
 
307
  // Append to the provided string.
308
  str.append((char*)start, give);
309
 
310
  return give;
311
}
312
 
313
void TMemoryBuffer::ensureCanWrite(uint32_t len) {
314
  // Check available space
315
  uint32_t avail = available_write();
316
  if (len <= avail) {
317
    return;
318
  }
319
 
320
  if (!owner_) {
321
    throw TTransportException("Insufficient space in external MemoryBuffer");
322
  }
323
 
324
  // Grow the buffer as necessary.
325
  while (len > avail) {
326
    bufferSize_ *= 2;
327
    wBound_ = buffer_ + bufferSize_;
328
    avail = available_write();
329
  }
330
 
331
  // Allocate into a new pointer so we don't bork ours if it fails.
332
  void* new_buffer = std::realloc(buffer_, bufferSize_);
333
  if (new_buffer == NULL) {
334
    throw TTransportException("Out of memory.");
335
  }
336
 
337
  ptrdiff_t offset = (uint8_t*)new_buffer - buffer_;
338
  buffer_ += offset;
339
  rBase_ += offset;
340
  rBound_ += offset;
341
  wBase_ += offset;
342
  wBound_ += offset;
343
}
344
 
345
void TMemoryBuffer::writeSlow(const uint8_t* buf, uint32_t len) {
346
  ensureCanWrite(len);
347
 
348
  // Copy into the buffer and increment wBase_.
349
  memcpy(wBase_, buf, len);
350
  wBase_ += len;
351
}
352
 
353
void TMemoryBuffer::wroteBytes(uint32_t len) {
354
  uint32_t avail = available_write();
355
  if (len > avail) {
356
    throw TTransportException("Client wrote more bytes than size of buffer.");
357
  }
358
  wBase_ += len;
359
}
360
 
361
const uint8_t* TMemoryBuffer::borrowSlow(uint8_t* buf, uint32_t* len) {
362
  rBound_ = wBase_;
363
  if (available_read() >= *len) {
364
    *len = available_read();
365
    return rBase_;
366
  }
367
  return NULL;
368
}
369
 
370
}}} // apache::thrift::transport