Subversion Repositories SmartDukaan

Rev

Rev 30 | Details | Compare with Previous | Last modification | View Log | RSS feed

Rev Author Line No. Line
30 ashish 1
/*
2
 * Licensed to the Apache Software Foundation (ASF) under one
3
 * or more contributor license agreements. See the NOTICE file
4
 * distributed with this work for additional information
5
 * regarding copyright ownership. The ASF licenses this file
6
 * to you under the Apache License, Version 2.0 (the
7
 * "License"); you may not use this file except in compliance
8
 * with the License. You may obtain a copy of the License at
9
 *
10
 *   http://www.apache.org/licenses/LICENSE-2.0
11
 *
12
 * Unless required by applicable law or agreed to in writing,
13
 * software distributed under the License is distributed on an
14
 * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
15
 * KIND, either express or implied. See the License for the
16
 * specific language governing permissions and limitations
17
 * under the License.
18
 */
19
 
20
#ifndef _THRIFT_TRANSPORT_TTRANSPORTUTILS_H_
21
#define _THRIFT_TRANSPORT_TTRANSPORTUTILS_H_ 1
22
 
23
#include <cstdlib>
24
#include <cstring>
25
#include <string>
26
#include <algorithm>
27
#include <transport/TTransport.h>
28
// Include the buffered transports that used to be defined here.
29
#include <transport/TBufferTransports.h>
30
#include <transport/TFileTransport.h>
31
 
32
namespace apache { namespace thrift { namespace transport {
33
 
34
/**
35
 * The null transport is a dummy transport that doesn't actually do anything.
36
 * It's sort of an analogy to /dev/null, you can never read anything from it
37
 * and it will let you write anything you want to it, though it won't actually
38
 * go anywhere.
39
 *
40
 */
41
class TNullTransport : public TTransport {
42
 public:
43
  TNullTransport() {}
44
 
45
  ~TNullTransport() {}
46
 
47
  bool isOpen() {
48
    return true;
49
  }
50
 
51
  void open() {}
52
 
53
  void write(const uint8_t* /* buf */, uint32_t /* len */) {
54
    return;
55
  }
56
 
57
};
58
 
59
 
60
/**
61
 * TPipedTransport. This transport allows piping of a request from one
62
 * transport to another either when readEnd() or writeEnd(). The typical
63
 * use case for this is to log a request or a reply to disk.
64
 * The underlying buffer expands to a keep a copy of the entire
65
 * request/response.
66
 *
67
 */
68
class TPipedTransport : virtual public TTransport {
69
 public:
70
  TPipedTransport(boost::shared_ptr<TTransport> srcTrans,
71
                  boost::shared_ptr<TTransport> dstTrans) :
72
    srcTrans_(srcTrans),
73
    dstTrans_(dstTrans),
74
    rBufSize_(512), rPos_(0), rLen_(0),
75
    wBufSize_(512), wLen_(0) {
76
 
77
    // default is to to pipe the request when readEnd() is called
78
    pipeOnRead_ = true;
79
    pipeOnWrite_ = false;
80
 
81
    rBuf_ = (uint8_t*) std::malloc(sizeof(uint8_t) * rBufSize_);
82
    wBuf_ = (uint8_t*) std::malloc(sizeof(uint8_t) * wBufSize_);
83
  }
84
 
85
  TPipedTransport(boost::shared_ptr<TTransport> srcTrans,
86
                  boost::shared_ptr<TTransport> dstTrans,
87
                  uint32_t sz) :
88
    srcTrans_(srcTrans),
89
    dstTrans_(dstTrans),
90
    rBufSize_(512), rPos_(0), rLen_(0),
91
    wBufSize_(sz), wLen_(0) {
92
 
93
    rBuf_ = (uint8_t*) std::malloc(sizeof(uint8_t) * rBufSize_);
94
    wBuf_ = (uint8_t*) std::malloc(sizeof(uint8_t) * wBufSize_);
95
  }
96
 
97
  ~TPipedTransport() {
98
    std::free(rBuf_);
99
    std::free(wBuf_);
100
  }
101
 
102
  bool isOpen() {
103
    return srcTrans_->isOpen();
104
  }
105
 
106
  bool peek() {
107
    if (rPos_ >= rLen_) {
108
      // Double the size of the underlying buffer if it is full
109
      if (rLen_ == rBufSize_) {
110
        rBufSize_ *=2;
111
        rBuf_ = (uint8_t *)std::realloc(rBuf_, sizeof(uint8_t) * rBufSize_);
112
      }
113
 
114
      // try to fill up the buffer
115
      rLen_ += srcTrans_->read(rBuf_+rPos_, rBufSize_ - rPos_);
116
    }
117
    return (rLen_ > rPos_);
118
  }
119
 
120
 
121
  void open() {
122
    srcTrans_->open();
123
  }
124
 
125
  void close() {
126
    srcTrans_->close();
127
  }
128
 
129
  void setPipeOnRead(bool pipeVal) {
130
    pipeOnRead_ = pipeVal;
131
  }
132
 
133
  void setPipeOnWrite(bool pipeVal) {
134
    pipeOnWrite_ = pipeVal;
135
  }
136
 
137
  uint32_t read(uint8_t* buf, uint32_t len);
138
 
139
  void readEnd() {
140
 
141
    if (pipeOnRead_) {
142
      dstTrans_->write(rBuf_, rPos_);
143
      dstTrans_->flush();
144
    }
145
 
146
    srcTrans_->readEnd();
147
 
148
    // If requests are being pipelined, copy down our read-ahead data,
149
    // then reset our state.
150
    int read_ahead = rLen_ - rPos_;
151
    memcpy(rBuf_, rBuf_ + rPos_, read_ahead);
152
    rPos_ = 0;
153
    rLen_ = read_ahead;
154
  }
155
 
156
  void write(const uint8_t* buf, uint32_t len);
157
 
158
  void writeEnd() {
159
    if (pipeOnWrite_) {
160
      dstTrans_->write(wBuf_, wLen_);
161
      dstTrans_->flush();
162
    }
163
  }
164
 
165
  void flush();
166
 
167
  boost::shared_ptr<TTransport> getTargetTransport() {
168
    return dstTrans_;
169
  }
170
 
171
 protected:
172
  boost::shared_ptr<TTransport> srcTrans_;
173
  boost::shared_ptr<TTransport> dstTrans_;
174
 
175
  uint8_t* rBuf_;
176
  uint32_t rBufSize_;
177
  uint32_t rPos_;
178
  uint32_t rLen_;
179
 
180
  uint8_t* wBuf_;
181
  uint32_t wBufSize_;
182
  uint32_t wLen_;
183
 
184
  bool pipeOnRead_;
185
  bool pipeOnWrite_;
186
};
187
 
188
 
189
/**
190
 * Wraps a transport into a pipedTransport instance.
191
 *
192
 */
193
class TPipedTransportFactory : public TTransportFactory {
194
 public:
195
  TPipedTransportFactory() {}
196
  TPipedTransportFactory(boost::shared_ptr<TTransport> dstTrans) {
197
    initializeTargetTransport(dstTrans);
198
  }
199
  virtual ~TPipedTransportFactory() {}
200
 
201
  /**
202
   * Wraps the base transport into a piped transport.
203
   */
204
  virtual boost::shared_ptr<TTransport> getTransport(boost::shared_ptr<TTransport> srcTrans) {
205
    return boost::shared_ptr<TTransport>(new TPipedTransport(srcTrans, dstTrans_));
206
  }
207
 
208
  virtual void initializeTargetTransport(boost::shared_ptr<TTransport> dstTrans) {
209
    if (dstTrans_.get() == NULL) {
210
      dstTrans_ = dstTrans;
211
    } else {
212
      throw TException("Target transport already initialized");
213
    }
214
  }
215
 
216
 protected:
217
  boost::shared_ptr<TTransport> dstTrans_;
218
};
219
 
220
/**
221
 * TPipedFileTransport. This is just like a TTransport, except that
222
 * it is a templatized class, so that clients who rely on a specific
223
 * TTransport can still access the original transport.
224
 *
225
 */
226
class TPipedFileReaderTransport : public TPipedTransport,
227
                                  public TFileReaderTransport {
228
 public:
229
  TPipedFileReaderTransport(boost::shared_ptr<TFileReaderTransport> srcTrans, boost::shared_ptr<TTransport> dstTrans);
230
 
231
  ~TPipedFileReaderTransport();
232
 
233
  // TTransport functions
234
  bool isOpen();
235
  bool peek();
236
  void open();
237
  void close();
238
  uint32_t read(uint8_t* buf, uint32_t len);
239
  uint32_t readAll(uint8_t* buf, uint32_t len);
240
  void readEnd();
241
  void write(const uint8_t* buf, uint32_t len);
242
  void writeEnd();
243
  void flush();
244
 
245
  // TFileReaderTransport functions
246
  int32_t getReadTimeout();
247
  void setReadTimeout(int32_t readTimeout);
248
  uint32_t getNumChunks();
249
  uint32_t getCurChunk();
250
  void seekToChunk(int32_t chunk);
251
  void seekToEnd();
252
 
253
 protected:
254
  // shouldn't be used
255
  TPipedFileReaderTransport();
256
  boost::shared_ptr<TFileReaderTransport> srcTrans_;
257
};
258
 
259
/**
260
 * Creates a TPipedFileReaderTransport from a filepath and a destination transport
261
 *
262
 */
263
class TPipedFileReaderTransportFactory : public TPipedTransportFactory {
264
 public:
265
  TPipedFileReaderTransportFactory() {}
266
  TPipedFileReaderTransportFactory(boost::shared_ptr<TTransport> dstTrans)
267
    : TPipedTransportFactory(dstTrans)
268
  {}
269
  virtual ~TPipedFileReaderTransportFactory() {}
270
 
271
  boost::shared_ptr<TTransport> getTransport(boost::shared_ptr<TTransport> srcTrans) {
272
    boost::shared_ptr<TFileReaderTransport> pFileReaderTransport = boost::dynamic_pointer_cast<TFileReaderTransport>(srcTrans);
273
    if (pFileReaderTransport.get() != NULL) {
274
      return getFileReaderTransport(pFileReaderTransport);
275
    } else {
276
      return boost::shared_ptr<TTransport>();
277
    }
278
  }
279
 
280
  boost::shared_ptr<TFileReaderTransport> getFileReaderTransport(boost::shared_ptr<TFileReaderTransport> srcTrans) {
281
    return boost::shared_ptr<TFileReaderTransport>(new TPipedFileReaderTransport(srcTrans, dstTrans_));
282
  }
283
};
284
 
285
}}} // apache::thrift::transport
286
 
287
#endif // #ifndef _THRIFT_TRANSPORT_TTRANSPORTUTILS_H_