Subversion Repositories SmartDukaan

Rev

Rev 30 | Details | Compare with Previous | Last modification | View Log | RSS feed

Rev Author Line No. Line
30 ashish 1
/*
2
 * Licensed to the Apache Software Foundation (ASF) under one
3
 * or more contributor license agreements. See the NOTICE file
4
 * distributed with this work for additional information
5
 * regarding copyright ownership. The ASF licenses this file
6
 * to you under the Apache License, Version 2.0 (the
7
 * "License"); you may not use this file except in compliance
8
 * with the License. You may obtain a copy of the License at
9
 *
10
 *   http://www.apache.org/licenses/LICENSE-2.0
11
 *
12
 * Unless required by applicable law or agreed to in writing,
13
 * software distributed under the License is distributed on an
14
 * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
15
 * KIND, either express or implied. See the License for the
16
 * specific language governing permissions and limitations
17
 * under the License.
18
 */
19
 
20
#ifndef _THRIFT_TRANSPORT_TBUFFERTRANSPORTS_H_
21
#define _THRIFT_TRANSPORT_TBUFFERTRANSPORTS_H_ 1
22
 
23
#include <cstring>
24
#include "boost/scoped_array.hpp"
25
 
26
#include <transport/TTransport.h>
27
 
28
#ifdef __GNUC__
29
#define TDB_LIKELY(val) (__builtin_expect((val), 1))
30
#define TDB_UNLIKELY(val) (__builtin_expect((val), 0))
31
#else
32
#define TDB_LIKELY(val) (val)
33
#define TDB_UNLIKELY(val) (val)
34
#endif
35
 
36
namespace apache { namespace thrift { namespace transport {
37
 
38
 
39
/**
40
 * Base class for all transports that use read/write buffers for performance.
41
 *
42
 * TBufferBase is designed to implement the fast-path "memcpy" style
43
 * operations that work in the common case.  It does so with small and
44
 * (eventually) nonvirtual, inlinable methods.  TBufferBase is an abstract
45
 * class.  Subclasses are expected to define the "slow path" operations
46
 * that have to be done when the buffers are full or empty.
47
 *
48
 */
49
class TBufferBase : public TTransport {
50
 
51
 public:
52
 
53
  /**
54
   * Fast-path read.
55
   *
56
   * When we have enough data buffered to fulfill the read, we can satisfy it
57
   * with a single memcpy, then adjust our internal pointers.  If the buffer
58
   * is empty, we call out to our slow path, implemented by a subclass.
59
   * This method is meant to eventually be nonvirtual and inlinable.
60
   */
61
  uint32_t read(uint8_t* buf, uint32_t len) {
62
    uint8_t* new_rBase = rBase_ + len;
63
    if (TDB_LIKELY(new_rBase <= rBound_)) {
64
      std::memcpy(buf, rBase_, len);
65
      rBase_ = new_rBase;
66
      return len;
67
    }
68
    return readSlow(buf, len);
69
  }
70
 
71
  /**
72
   * Fast-path write.
73
   *
74
   * When we have enough empty space in our buffer to accomodate the write, we
75
   * can satisfy it with a single memcpy, then adjust our internal pointers.
76
   * If the buffer is full, we call out to our slow path, implemented by a
77
   * subclass.  This method is meant to eventually be nonvirtual and
78
   * inlinable.
79
   */
80
  void write(const uint8_t* buf, uint32_t len) {
81
    uint8_t* new_wBase = wBase_ + len;
82
    if (TDB_LIKELY(new_wBase <= wBound_)) {
83
      std::memcpy(wBase_, buf, len);
84
      wBase_ = new_wBase;
85
      return;
86
    }
87
    writeSlow(buf, len);
88
  }
89
 
90
  /**
91
   * Fast-path borrow.  A lot like the fast-path read.
92
   */
93
  const uint8_t* borrow(uint8_t* buf, uint32_t* len) {
94
    if (TDB_LIKELY(static_cast<ptrdiff_t>(*len) <= rBound_ - rBase_)) {
95
      // With strict aliasing, writing to len shouldn't force us to
96
      // refetch rBase_ from memory.  TODO(dreiss): Verify this.
97
      *len = rBound_ - rBase_;
98
      return rBase_;
99
    }
100
    return borrowSlow(buf, len);
101
  }
102
 
103
  /**
104
   * Consume doesn't require a slow path.
105
   */
106
  void consume(uint32_t len) {
107
    if (TDB_LIKELY(static_cast<ptrdiff_t>(len) <= rBound_ - rBase_)) {
108
      rBase_ += len;
109
    } else {
110
      throw TTransportException(TTransportException::BAD_ARGS,
111
                                "consume did not follow a borrow.");
112
    }
113
  }
114
 
115
 
116
 protected:
117
 
118
  /// Slow path read.
119
  virtual uint32_t readSlow(uint8_t* buf, uint32_t len) = 0;
120
 
121
  /// Slow path write.
122
  virtual void writeSlow(const uint8_t* buf, uint32_t len) = 0;
123
 
124
  /**
125
   * Slow path borrow.
126
   *
127
   * POSTCONDITION: return == NULL || rBound_ - rBase_ >= *len
128
   */
129
  virtual const uint8_t* borrowSlow(uint8_t* buf, uint32_t* len) = 0;
130
 
131
  /**
132
   * Trivial constructor.
133
   *
134
   * Initialize pointers safely.  Constructing is not a very
135
   * performance-sensitive operation, so it is okay to just leave it to
136
   * the concrete class to set up pointers correctly.
137
   */
138
  TBufferBase()
139
    : rBase_(NULL)
140
    , rBound_(NULL)
141
    , wBase_(NULL)
142
    , wBound_(NULL)
143
  {}
144
 
145
  /// Convenience mutator for setting the read buffer.
146
  void setReadBuffer(uint8_t* buf, uint32_t len) {
147
    rBase_ = buf;
148
    rBound_ = buf+len;
149
  }
150
 
151
  /// Convenience mutator for setting the write buffer.
152
  void setWriteBuffer(uint8_t* buf, uint32_t len) {
153
    wBase_ = buf;
154
    wBound_ = buf+len;
155
  }
156
 
157
  virtual ~TBufferBase() {}
158
 
159
  /// Reads begin here.
160
  uint8_t* rBase_;
161
  /// Reads may extend to just before here.
162
  uint8_t* rBound_;
163
 
164
  /// Writes begin here.
165
  uint8_t* wBase_;
166
  /// Writes may extend to just before here.
167
  uint8_t* wBound_;
168
};
169
 
170
 
171
/** 
172
 * Base class for all transport which wraps transport to new one.
173
 */
174
class TUnderlyingTransport : public TBufferBase {
175
 public:
176
  static const int DEFAULT_BUFFER_SIZE = 512;
177
 
178
  virtual bool peek() {
179
    return (rBase_ < rBound_) || transport_->peek();
180
  }
181
 
182
  void open() {
183
    transport_->open();
184
  }
185
 
186
  bool isOpen() {
187
    return transport_->isOpen();
188
  }
189
 
190
  void close() {
191
    flush();
192
    transport_->close();
193
  }
194
 
195
  boost::shared_ptr<TTransport> getUnderlyingTransport() {
196
    return transport_;
197
  }
198
 
199
 protected:
200
  boost::shared_ptr<TTransport> transport_;
201
 
202
  uint32_t rBufSize_;
203
  uint32_t wBufSize_;
204
  boost::scoped_array<uint8_t> rBuf_;
205
  boost::scoped_array<uint8_t> wBuf_;
206
 
207
  TUnderlyingTransport(boost::shared_ptr<TTransport> transport, uint32_t sz)
208
    : transport_(transport)
209
    , rBufSize_(sz)
210
    , wBufSize_(sz)
211
    , rBuf_(new uint8_t[rBufSize_])
212
    , wBuf_(new uint8_t[wBufSize_]) {}
213
 
214
  TUnderlyingTransport(boost::shared_ptr<TTransport> transport)
215
    : transport_(transport)
216
    , rBufSize_(DEFAULT_BUFFER_SIZE)
217
    , wBufSize_(DEFAULT_BUFFER_SIZE)
218
    , rBuf_(new uint8_t[rBufSize_])
219
    , wBuf_(new uint8_t[wBufSize_]) {}
220
 
221
  TUnderlyingTransport(boost::shared_ptr<TTransport> transport, uint32_t rsz, uint32_t wsz)
222
    : transport_(transport)
223
    , rBufSize_(rsz)
224
    , wBufSize_(wsz)
225
    , rBuf_(new uint8_t[rBufSize_])
226
    , wBuf_(new uint8_t[wBufSize_]) {}
227
};
228
 
229
/**
230
 * Buffered transport. For reads it will read more data than is requested
231
 * and will serve future data out of a local buffer. For writes, data is
232
 * stored to an in memory buffer before being written out.
233
 *
234
 */
235
class TBufferedTransport : public TUnderlyingTransport {
236
 public:
237
 
238
  /// Use default buffer sizes.
239
  TBufferedTransport(boost::shared_ptr<TTransport> transport)
240
    : TUnderlyingTransport(transport)
241
  {
242
    initPointers();
243
  }
244
 
245
  /// Use specified buffer sizes.
246
  TBufferedTransport(boost::shared_ptr<TTransport> transport, uint32_t sz)
247
    : TUnderlyingTransport(transport, sz)
248
  {
249
    initPointers();
250
  }
251
 
252
  /// Use specified read and write buffer sizes.
253
  TBufferedTransport(boost::shared_ptr<TTransport> transport, uint32_t rsz, uint32_t wsz)
254
    : TUnderlyingTransport(transport, rsz, wsz)
255
  {
256
    initPointers();
257
  }
258
 
259
  virtual bool peek() {
260
    /* shigin: see THRIFT-96 discussion */
261
    if (rBase_ == rBound_) {
262
      setReadBuffer(rBuf_.get(), transport_->read(rBuf_.get(), rBufSize_));
263
    }
264
    return (rBound_ > rBase_);
265
  }
266
  virtual uint32_t readSlow(uint8_t* buf, uint32_t len);
267
 
268
  virtual void writeSlow(const uint8_t* buf, uint32_t len);
269
 
270
  void flush();
271
 
272
 
273
  /**
274
   * The following behavior is currently implemented by TBufferedTransport,
275
   * but that may change in a future version:
276
   * 1/ If len is at most rBufSize_, borrow will never return NULL.
277
   *    Depending on the underlying transport, it could throw an exception
278
   *    or hang forever.
279
   * 2/ Some borrow requests may copy bytes internally.  However,
280
   *    if len is at most rBufSize_/2, none of the copied bytes
281
   *    will ever have to be copied again.  For optimial performance,
282
   *    stay under this limit.
283
   */
284
  virtual const uint8_t* borrowSlow(uint8_t* buf, uint32_t* len);
285
 
286
 protected:
287
  void initPointers() {
288
    setReadBuffer(rBuf_.get(), 0);
289
    setWriteBuffer(wBuf_.get(), wBufSize_);
290
    // Write size never changes.
291
  }
292
};
293
 
294
 
295
/**
296
 * Wraps a transport into a buffered one.
297
 *
298
 */
299
class TBufferedTransportFactory : public TTransportFactory {
300
 public:
301
  TBufferedTransportFactory() {}
302
 
303
  virtual ~TBufferedTransportFactory() {}
304
 
305
  /**
306
   * Wraps the transport into a buffered one.
307
   */
308
  virtual boost::shared_ptr<TTransport> getTransport(boost::shared_ptr<TTransport> trans) {
309
    return boost::shared_ptr<TTransport>(new TBufferedTransport(trans));
310
  }
311
 
312
};
313
 
314
 
315
/**
316
 * Framed transport. All writes go into an in-memory buffer until flush is
317
 * called, at which point the transport writes the length of the entire
318
 * binary chunk followed by the data payload. This allows the receiver on the
319
 * other end to always do fixed-length reads.
320
 *
321
 */
322
class TFramedTransport : public TUnderlyingTransport {
323
 public:
324
 
325
  /// Use default buffer sizes.
326
  TFramedTransport(boost::shared_ptr<TTransport> transport)
327
    : TUnderlyingTransport(transport)
328
  {
329
    initPointers();
330
  }
331
 
332
  TFramedTransport(boost::shared_ptr<TTransport> transport, uint32_t sz)
333
    : TUnderlyingTransport(transport, sz)
334
  {
335
    initPointers();
336
  }
337
 
338
  virtual uint32_t readSlow(uint8_t* buf, uint32_t len);
339
 
340
  virtual void writeSlow(const uint8_t* buf, uint32_t len);
341
 
342
  virtual void flush();
343
 
344
  const uint8_t* borrowSlow(uint8_t* buf, uint32_t* len);
345
 
346
 protected:
347
  /**
348
   * Reads a frame of input from the underlying stream.
349
   */
350
  void readFrame();
351
 
352
  void initPointers() {
353
    setReadBuffer(NULL, 0);
354
    setWriteBuffer(wBuf_.get(), wBufSize_);
355
 
356
    // Pad the buffer so we can insert the size later.
357
    int32_t pad = 0;
358
    this->write((uint8_t*)&pad, sizeof(pad));
359
  }
360
};
361
 
362
/**
363
 * Wraps a transport into a framed one.
364
 *
365
 */
366
class TFramedTransportFactory : public TTransportFactory {
367
 public:
368
  TFramedTransportFactory() {}
369
 
370
  virtual ~TFramedTransportFactory() {}
371
 
372
  /**
373
   * Wraps the transport into a framed one.
374
   */
375
  virtual boost::shared_ptr<TTransport> getTransport(boost::shared_ptr<TTransport> trans) {
376
    return boost::shared_ptr<TTransport>(new TFramedTransport(trans));
377
  }
378
 
379
};
380
 
381
 
382
/**
383
 * A memory buffer is a tranpsort that simply reads from and writes to an
384
 * in memory buffer. Anytime you call write on it, the data is simply placed
385
 * into a buffer, and anytime you call read, data is read from that buffer.
386
 *
387
 * The buffers are allocated using C constructs malloc,realloc, and the size
388
 * doubles as necessary.  We've considered using scoped
389
 *
390
 */
391
class TMemoryBuffer : public TBufferBase {
392
 private:
393
 
394
  // Common initialization done by all constructors.
395
  void initCommon(uint8_t* buf, uint32_t size, bool owner, uint32_t wPos) {
396
    if (buf == NULL && size != 0) {
397
      assert(owner);
398
      buf = (uint8_t*)std::malloc(size);
399
      if (buf == NULL) {
400
        throw TTransportException("Out of memory");
401
      }
402
    }
403
 
404
    buffer_ = buf;
405
    bufferSize_ = size;
406
 
407
    rBase_ = buffer_;
408
    rBound_ = buffer_ + wPos;
409
    // TODO(dreiss): Investigate NULL-ing this if !owner.
410
    wBase_ = buffer_ + wPos;
411
    wBound_ = buffer_ + bufferSize_;
412
 
413
    owner_ = owner;
414
 
415
    // rBound_ is really an artifact.  In principle, it should always be
416
    // equal to wBase_.  We update it in a few places (computeRead, etc.).
417
  }
418
 
419
 public:
420
  static const uint32_t defaultSize = 1024;
421
 
422
  /**
423
   * This enum specifies how a TMemoryBuffer should treat
424
   * memory passed to it via constructors or resetBuffer.
425
   *
426
   * OBSERVE:
427
   *   TMemoryBuffer will simply store a pointer to the memory.
428
   *   It is the callers responsibility to ensure that the pointer
429
   *   remains valid for the lifetime of the TMemoryBuffer,
430
   *   and that it is properly cleaned up.
431
   *   Note that no data can be written to observed buffers.
432
   *
433
   * COPY:
434
   *   TMemoryBuffer will make an internal copy of the buffer.
435
   *   The caller has no responsibilities.
436
   *
437
   * TAKE_OWNERSHIP:
438
   *   TMemoryBuffer will become the "owner" of the buffer,
439
   *   and will be responsible for freeing it.
440
   *   The membory must have been allocated with malloc.
441
   */
442
  enum MemoryPolicy
443
  { OBSERVE = 1
444
  , COPY = 2
445
  , TAKE_OWNERSHIP = 3
446
  };
447
 
448
  /**
449
   * Construct a TMemoryBuffer with a default-sized buffer,
450
   * owned by the TMemoryBuffer object.
451
   */
452
  TMemoryBuffer() {
453
    initCommon(NULL, defaultSize, true, 0);
454
  }
455
 
456
  /**
457
   * Construct a TMemoryBuffer with a buffer of a specified size,
458
   * owned by the TMemoryBuffer object.
459
   *
460
   * @param sz  The initial size of the buffer.
461
   */
462
  TMemoryBuffer(uint32_t sz) {
463
    initCommon(NULL, sz, true, 0);
464
  }
465
 
466
  /**
467
   * Construct a TMemoryBuffer with buf as its initial contents.
468
   *
469
   * @param buf    The initial contents of the buffer.
470
   *               Note that, while buf is a non-const pointer,
471
   *               TMemoryBuffer will not write to it if policy == OBSERVE,
472
   *               so it is safe to const_cast<uint8_t*>(whatever).
473
   * @param sz     The size of @c buf.
474
   * @param policy See @link MemoryPolicy @endlink .
475
   */
476
  TMemoryBuffer(uint8_t* buf, uint32_t sz, MemoryPolicy policy = OBSERVE) {
477
    if (buf == NULL && sz != 0) {
478
      throw TTransportException(TTransportException::BAD_ARGS,
479
                                "TMemoryBuffer given null buffer with non-zero size.");
480
    }
481
 
482
    switch (policy) {
483
      case OBSERVE:
484
      case TAKE_OWNERSHIP:
485
        initCommon(buf, sz, policy == TAKE_OWNERSHIP, sz);
486
        break;
487
      case COPY:
488
        initCommon(NULL, sz, true, 0);
489
        this->write(buf, sz);
490
        break;
491
      default:
492
        throw TTransportException(TTransportException::BAD_ARGS,
493
                                  "Invalid MemoryPolicy for TMemoryBuffer");
494
    }
495
  }
496
 
497
  ~TMemoryBuffer() {
498
    if (owner_) {
499
      std::free(buffer_);
500
    }
501
  }
502
 
503
  bool isOpen() {
504
    return true;
505
  }
506
 
507
  bool peek() {
508
    return (rBase_ < wBase_);
509
  }
510
 
511
  void open() {}
512
 
513
  void close() {}
514
 
515
  // TODO(dreiss): Make bufPtr const.
516
  void getBuffer(uint8_t** bufPtr, uint32_t* sz) {
517
    *bufPtr = rBase_;
518
    *sz = wBase_ - rBase_;
519
  }
520
 
521
  std::string getBufferAsString() {
522
    if (buffer_ == NULL) {
523
      return "";
524
    }
525
    uint8_t* buf;
526
    uint32_t sz;
527
    getBuffer(&buf, &sz);
528
    return std::string((char*)buf, (std::string::size_type)sz);
529
  }
530
 
531
  void appendBufferToString(std::string& str) {
532
    if (buffer_ == NULL) {
533
      return;
534
    }
535
    uint8_t* buf;
536
    uint32_t sz;
537
    getBuffer(&buf, &sz);
538
    str.append((char*)buf, sz);
539
  }
540
 
541
  void resetBuffer(bool reset_capacity = false) {
542
    if (reset_capacity)
543
    {
544
      assert(owner_);
545
 
546
      void* new_buffer = std::realloc(buffer_, defaultSize);
547
 
548
      if (new_buffer == NULL) {
549
        throw TTransportException("Out of memory.");
550
      }
551
 
552
      buffer_ = (uint8_t*) new_buffer;
553
      bufferSize_ = defaultSize;
554
 
555
      wBound_ = buffer_ + bufferSize_;
556
    }
557
 
558
    rBase_ = buffer_;
559
    rBound_ = buffer_;
560
    wBase_ = buffer_;
561
    // It isn't safe to write into a buffer we don't own.
562
    if (!owner_) {
563
      wBound_ = wBase_;
564
      bufferSize_ = 0;
565
    }
566
  }
567
 
568
  /// See constructor documentation.
569
  void resetBuffer(uint8_t* buf, uint32_t sz, MemoryPolicy policy = OBSERVE) {
570
    // Use a variant of the copy-and-swap trick for assignment operators.
571
    // This is sub-optimal in terms of performance for two reasons:
572
    //   1/ The constructing and swapping of the (small) values
573
    //      in the temporary object takes some time, and is not necessary.
574
    //   2/ If policy == COPY, we allocate the new buffer before
575
    //      freeing the old one, precluding the possibility of
576
    //      reusing that memory.
577
    // I doubt that either of these problems could be optimized away,
578
    // but the second is probably no a common case, and the first is minor.
579
    // I don't expect resetBuffer to be a common operation, so I'm willing to
580
    // bite the performance bullet to make the method this simple.
581
 
582
    // Construct the new buffer.
583
    TMemoryBuffer new_buffer(buf, sz, policy);
584
    // Move it into ourself.
585
    this->swap(new_buffer);
586
    // Our old self gets destroyed.
587
  }
588
 
589
  std::string readAsString(uint32_t len) {
590
    std::string str;
591
    (void)readAppendToString(str, len);
592
    return str;
593
  }
594
 
595
  uint32_t readAppendToString(std::string& str, uint32_t len);
596
 
597
  void readEnd() {
598
    if (rBase_ == wBase_) {
599
      resetBuffer();
600
    }
601
  }
602
 
603
  uint32_t available_read() const {
604
    // Remember, wBase_ is the real rBound_.
605
    return wBase_ - rBase_;
606
  }
607
 
608
  uint32_t available_write() const {
609
    return wBound_ - wBase_;
610
  }
611
 
612
  // Returns a pointer to where the client can write data to append to
613
  // the TMemoryBuffer, and ensures the buffer is big enough to accomodate a
614
  // write of the provided length.  The returned pointer is very convenient for
615
  // passing to read(), recv(), or similar. You must call wroteBytes() as soon
616
  // as data is written or the buffer will not be aware that data has changed.
617
  uint8_t* getWritePtr(uint32_t len) {
618
    ensureCanWrite(len);
619
    return wBase_;
620
  }
621
 
622
  // Informs the buffer that the client has written 'len' bytes into storage
623
  // that had been provided by getWritePtr().
624
  void wroteBytes(uint32_t len);
625
 
626
 protected:
627
  void swap(TMemoryBuffer& that) {
628
    using std::swap;
629
    swap(buffer_,     that.buffer_);
630
    swap(bufferSize_, that.bufferSize_);
631
 
632
    swap(rBase_,      that.rBase_);
633
    swap(rBound_,     that.rBound_);
634
    swap(wBase_,      that.wBase_);
635
    swap(wBound_,     that.wBound_);
636
 
637
    swap(owner_,      that.owner_);
638
  }
639
 
640
  // Make sure there's at least 'len' bytes available for writing.
641
  void ensureCanWrite(uint32_t len);
642
 
643
  // Compute the position and available data for reading.
644
  void computeRead(uint32_t len, uint8_t** out_start, uint32_t* out_give);
645
 
646
  uint32_t readSlow(uint8_t* buf, uint32_t len);
647
 
648
  void writeSlow(const uint8_t* buf, uint32_t len);
649
 
650
  const uint8_t* borrowSlow(uint8_t* buf, uint32_t* len);
651
 
652
  // Data buffer
653
  uint8_t* buffer_;
654
 
655
  // Allocated buffer size
656
  uint32_t bufferSize_;
657
 
658
  // Is this object the owner of the buffer?
659
  bool owner_;
660
 
661
  // Don't forget to update constrctors, initCommon, and swap if
662
  // you add new members.
663
};
664
 
665
}}} // apache::thrift::transport
666
 
667
#endif // #ifndef _THRIFT_TRANSPORT_TBUFFERTRANSPORTS_H_