Subversion Repositories SmartDukaan

Rev

Rev 30 | Details | Compare with Previous | Last modification | View Log | RSS feed

Rev Author Line No. Line
30 ashish 1
/*
2
 * Licensed to the Apache Software Foundation (ASF) under one
3
 * or more contributor license agreements. See the NOTICE file
4
 * distributed with this work for additional information
5
 * regarding copyright ownership. The ASF licenses this file
6
 * to you under the Apache License, Version 2.0 (the
7
 * "License"); you may not use this file except in compliance
8
 * with the License. You may obtain a copy of the License at
9
 *
10
 *   http://www.apache.org/licenses/LICENSE-2.0
11
 *
12
 * Unless required by applicable law or agreed to in writing,
13
 * software distributed under the License is distributed on an
14
 * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
15
 * KIND, either express or implied. See the License for the
16
 * specific language governing permissions and limitations
17
 * under the License.
18
 */
19
 
20
#ifndef _THRIFT_TRANSPORT_TZLIBTRANSPORT_H_
21
#define _THRIFT_TRANSPORT_TZLIBTRANSPORT_H_ 1
22
 
23
#include <boost/lexical_cast.hpp>
24
#include <transport/TTransport.h>
25
 
26
struct z_stream_s;
27
 
28
namespace apache { namespace thrift { namespace transport {
29
 
30
class TZlibTransportException : public TTransportException {
31
 public:
32
  TZlibTransportException(int status, const char* msg) :
33
    TTransportException(TTransportException::INTERNAL_ERROR,
34
                        errorMessage(status, msg)),
35
    zlib_status_(status),
36
    zlib_msg_(msg == NULL ? "(null)" : msg) {}
37
 
38
  virtual ~TZlibTransportException() throw() {}
39
 
40
  int getZlibStatus() { return zlib_status_; }
41
  std::string getZlibMessage() { return zlib_msg_; }
42
 
43
  static std::string errorMessage(int status, const char* msg) {
44
    std::string rv = "zlib error: ";
45
    if (msg) {
46
      rv += msg;
47
    } else {
48
      rv += "(no message)";
49
    }
50
    rv += " (status = ";
51
    rv += boost::lexical_cast<std::string>(status);
52
    rv += ")";
53
    return rv;
54
  }
55
 
56
  int zlib_status_;
57
  std::string zlib_msg_;
58
};
59
 
60
/**
61
 * This transport uses zlib's compressed format on the "far" side.
62
 *
63
 * There are two kinds of TZlibTransport objects:
64
 * - Standalone objects are used to encode self-contained chunks of data
65
 *   (like structures).  They include checksums.
66
 * - Non-standalone transports are used for RPC.  They are not implemented yet.
67
 *
68
 * TODO(dreiss): Don't do an extra copy of the compressed data if
69
 *               the underlying transport is TBuffered or TMemory.
70
 *
71
 */
72
class TZlibTransport : public TTransport {
73
 public:
74
 
75
  /**
76
   * @param transport    The transport to read compressed data from
77
   *                     and write compressed data to.
78
   * @param use_for_rpc  True if this object will be used for RPC,
79
   *                     false if this is a standalone object.
80
   * @param urbuf_size   Uncompressed buffer size for reading.
81
   * @param crbuf_size   Compressed buffer size for reading.
82
   * @param uwbuf_size   Uncompressed buffer size for writing.
83
   * @param cwbuf_size   Compressed buffer size for writing.
84
   *
85
   * TODO(dreiss): Write a constructor that isn't a pain.
86
   */
87
  TZlibTransport(boost::shared_ptr<TTransport> transport,
88
                 bool use_for_rpc,
89
                 int urbuf_size = DEFAULT_URBUF_SIZE,
90
                 int crbuf_size = DEFAULT_CRBUF_SIZE,
91
                 int uwbuf_size = DEFAULT_UWBUF_SIZE,
92
                 int cwbuf_size = DEFAULT_CWBUF_SIZE) :
93
    transport_(transport),
94
    standalone_(!use_for_rpc),
95
    urpos_(0),
96
    uwpos_(0),
97
    input_ended_(false),
98
    output_flushed_(false),
99
    urbuf_size_(urbuf_size),
100
    crbuf_size_(crbuf_size),
101
    uwbuf_size_(uwbuf_size),
102
    cwbuf_size_(cwbuf_size),
103
    urbuf_(NULL),
104
    crbuf_(NULL),
105
    uwbuf_(NULL),
106
    cwbuf_(NULL),
107
    rstream_(NULL),
108
    wstream_(NULL)
109
  {
110
 
111
    if (!standalone_) {
112
      throw TTransportException(
113
          TTransportException::BAD_ARGS,
114
          "TZLibTransport has not been tested for RPC.");
115
    }
116
 
117
    if (uwbuf_size_ < MIN_DIRECT_DEFLATE_SIZE) {
118
      // Have to copy this into a local because of a linking issue.
119
      int minimum = MIN_DIRECT_DEFLATE_SIZE;
120
      throw TTransportException(
121
          TTransportException::BAD_ARGS,
122
          "TZLibTransport: uncompressed write buffer must be at least"
123
          + boost::lexical_cast<std::string>(minimum) + ".");
124
    }
125
 
126
    try {
127
      urbuf_ = new uint8_t[urbuf_size];
128
      crbuf_ = new uint8_t[crbuf_size];
129
      uwbuf_ = new uint8_t[uwbuf_size];
130
      cwbuf_ = new uint8_t[cwbuf_size];
131
 
132
      // Don't call this outside of the constructor.
133
      initZlib();
134
 
135
    } catch (...) {
136
      delete[] urbuf_;
137
      delete[] crbuf_;
138
      delete[] uwbuf_;
139
      delete[] cwbuf_;
140
      throw;
141
    }
142
  }
143
 
144
  // Don't call this outside of the constructor.
145
  void initZlib();
146
 
147
  ~TZlibTransport();
148
 
149
  bool isOpen();
150
 
151
  void open() {
152
    transport_->open();
153
  }
154
 
155
  void close() {
156
    transport_->close();
157
  }
158
 
159
  uint32_t read(uint8_t* buf, uint32_t len);
160
 
161
  void write(const uint8_t* buf, uint32_t len);
162
 
163
  void flush();
164
 
165
  const uint8_t* borrow(uint8_t* buf, uint32_t* len);
166
 
167
  void consume(uint32_t len);
168
 
169
  void verifyChecksum();
170
 
171
   /**
172
    * TODO(someone_smart): Choose smart defaults.
173
    */
174
  static const int DEFAULT_URBUF_SIZE = 128;
175
  static const int DEFAULT_CRBUF_SIZE = 1024;
176
  static const int DEFAULT_UWBUF_SIZE = 128;
177
  static const int DEFAULT_CWBUF_SIZE = 1024;
178
 
179
 protected:
180
 
181
  inline void checkZlibRv(int status, const char* msg);
182
  inline void checkZlibRvNothrow(int status, const char* msg);
183
  inline int readAvail();
184
  void flushToZlib(const uint8_t* buf, int len, bool finish = false);
185
 
186
  // Writes smaller than this are buffered up.
187
  // Larger (or equal) writes are dumped straight to zlib.
188
  static const int MIN_DIRECT_DEFLATE_SIZE = 32;
189
 
190
  boost::shared_ptr<TTransport> transport_;
191
  bool standalone_;
192
 
193
  int urpos_;
194
  int uwpos_;
195
 
196
  /// True iff zlib has reached the end of a stream.
197
  /// This is only ever true in standalone protcol objects.
198
  bool input_ended_;
199
  /// True iff we have flushed the output stream.
200
  /// This is only ever true in standalone protcol objects.
201
  bool output_flushed_;
202
 
203
  int urbuf_size_;
204
  int crbuf_size_;
205
  int uwbuf_size_;
206
  int cwbuf_size_;
207
 
208
  uint8_t* urbuf_;
209
  uint8_t* crbuf_;
210
  uint8_t* uwbuf_;
211
  uint8_t* cwbuf_;
212
 
213
  struct z_stream_s* rstream_;
214
  struct z_stream_s* wstream_;
215
};
216
 
217
}}} // apache::thrift::transport
218
 
219
#endif // #ifndef _THRIFT_TRANSPORT_TZLIBTRANSPORT_H_