Subversion Repositories SmartDukaan

Rev

Rev 30 | Details | Compare with Previous | Last modification | View Log | RSS feed

Rev Author Line No. Line
30 ashish 1
/*
2
 * Licensed to the Apache Software Foundation (ASF) under one
3
 * or more contributor license agreements. See the NOTICE file
4
 * distributed with this work for additional information
5
 * regarding copyright ownership. The ASF licenses this file
6
 * to you under the Apache License, Version 2.0 (the
7
 * "License"); you may not use this file except in compliance
8
 * with the License. You may obtain a copy of the License at
9
 *
10
 *   http://www.apache.org/licenses/LICENSE-2.0
11
 *
12
 * Unless required by applicable law or agreed to in writing,
13
 * software distributed under the License is distributed on an
14
 * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
15
 * KIND, either express or implied. See the License for the
16
 * specific language governing permissions and limitations
17
 * under the License.
18
 */
19
 
20
#include <cassert>
21
#include <cstring>
22
#include <algorithm>
23
#include <transport/TZlibTransport.h>
24
#include <zlib.h>
25
 
26
using std::string;
27
 
28
namespace apache { namespace thrift { namespace transport {
29
 
30
// Don't call this outside of the constructor.
31
void TZlibTransport::initZlib() {
32
  int rv;
33
  bool r_init = false;
34
  try {
35
    rstream_ = new z_stream;
36
    wstream_ = new z_stream;
37
 
38
    rstream_->zalloc = Z_NULL;
39
    wstream_->zalloc = Z_NULL;
40
    rstream_->zfree  = Z_NULL;
41
    wstream_->zfree  = Z_NULL;
42
    rstream_->opaque = Z_NULL;
43
    wstream_->opaque = Z_NULL;
44
 
45
    rstream_->next_in   = crbuf_;
46
    wstream_->next_in   = uwbuf_;
47
    rstream_->next_out  = urbuf_;
48
    wstream_->next_out  = cwbuf_;
49
    rstream_->avail_in  = 0;
50
    wstream_->avail_in  = 0;
51
    rstream_->avail_out = urbuf_size_;
52
    wstream_->avail_out = cwbuf_size_;
53
 
54
    rv = inflateInit(rstream_);
55
    checkZlibRv(rv, rstream_->msg);
56
 
57
    // Have to set this flag so we know whether to de-initialize.
58
    r_init = true;
59
 
60
    rv = deflateInit(wstream_, Z_DEFAULT_COMPRESSION);
61
    checkZlibRv(rv, wstream_->msg);
62
  }
63
 
64
  catch (...) {
65
    if (r_init) {
66
      rv = inflateEnd(rstream_);
67
      checkZlibRvNothrow(rv, rstream_->msg);
68
    }
69
    // There is no way we can get here if wstream_ was initialized.
70
 
71
    throw;
72
  }
73
}
74
 
75
inline void TZlibTransport::checkZlibRv(int status, const char* message) {
76
  if (status != Z_OK) {
77
    throw TZlibTransportException(status, message);
78
  }
79
}
80
 
81
inline void TZlibTransport::checkZlibRvNothrow(int status, const char* message) {
82
  if (status != Z_OK) {
83
    string output = "TZlibTransport: zlib failure in destructor: " +
84
      TZlibTransportException::errorMessage(status, message);
85
    GlobalOutput(output.c_str());
86
  }
87
}
88
 
89
TZlibTransport::~TZlibTransport() {
90
  int rv;
91
  rv = inflateEnd(rstream_);
92
  checkZlibRvNothrow(rv, rstream_->msg);
93
  rv = deflateEnd(wstream_);
94
  checkZlibRvNothrow(rv, wstream_->msg);
95
 
96
  delete[] urbuf_;
97
  delete[] crbuf_;
98
  delete[] uwbuf_;
99
  delete[] cwbuf_;
100
  delete rstream_;
101
  delete wstream_;
102
}
103
 
104
bool TZlibTransport::isOpen() {
105
  return (readAvail() > 0) || transport_->isOpen();
106
}
107
 
108
// READING STRATEGY
109
//
110
// We have two buffers for reading: one containing the compressed data (crbuf_)
111
// and one containing the uncompressed data (urbuf_).  When read is called,
112
// we repeat the following steps until we have satisfied the request:
113
// - Copy data from urbuf_ into the caller's buffer.
114
// - If we had enough, return.
115
// - If urbuf_ is empty, read some data into it from the underlying transport.
116
// - Inflate data from crbuf_ into urbuf_.
117
//
118
// In standalone objects, we set input_ended_ to true when inflate returns
119
// Z_STREAM_END.  This allows to make sure that a checksum was verified.
120
 
121
inline int TZlibTransport::readAvail() {
122
  return urbuf_size_ - rstream_->avail_out - urpos_;
123
}
124
 
125
uint32_t TZlibTransport::read(uint8_t* buf, uint32_t len) {
126
  int need = len;
127
 
128
  // TODO(dreiss): Skip urbuf on big reads.
129
 
130
  while (true) {
131
    // Copy out whatever we have available, then give them the min of
132
    // what we have and what they want, then advance indices.
133
    int give = std::min(readAvail(), need);
134
    memcpy(buf, urbuf_ + urpos_, give);
135
    need -= give;
136
    buf += give;
137
    urpos_ += give;
138
 
139
    // If they were satisfied, we are done.
140
    if (need == 0) {
141
      return len;
142
    }
143
 
144
    // If we get to this point, we need to get some more data.
145
 
146
    // If zlib has reported the end of a stream, we can't really do any more.
147
    if (input_ended_) {
148
      return len - need;
149
    }
150
 
151
    // The uncompressed read buffer is empty, so reset the stream fields.
152
    rstream_->next_out  = urbuf_;
153
    rstream_->avail_out = urbuf_size_;
154
    urpos_ = 0;
155
 
156
    // If we don't have any more compressed data available,
157
    // read some from the underlying transport.
158
    if (rstream_->avail_in == 0) {
159
      uint32_t got = transport_->read(crbuf_, crbuf_size_);
160
      if (got == 0) {
161
        return len - need;
162
      }
163
      rstream_->next_in  = crbuf_;
164
      rstream_->avail_in = got;
165
    }
166
 
167
    // We have some compressed data now.  Uncompress it.
168
    int zlib_rv = inflate(rstream_, Z_SYNC_FLUSH);
169
 
170
    if (zlib_rv == Z_STREAM_END) {
171
      if (standalone_) {
172
        input_ended_ = true;
173
      }
174
    } else {
175
      checkZlibRv(zlib_rv, rstream_->msg);
176
    }
177
 
178
    // Okay.  The read buffer should have whatever we can give it now.
179
    // Loop back to the start and try to give some more.
180
  }
181
}
182
 
183
 
184
// WRITING STRATEGY
185
//
186
// We buffer up small writes before sending them to zlib, so our logic is:
187
// - Is the write big?
188
//   - Send the buffer to zlib.
189
//   - Send this data to zlib.
190
// - Is the write small?
191
//   - Is there insufficient space in the buffer for it?
192
//     - Send the buffer to zlib.
193
//   - Copy the data to the buffer.
194
//
195
// We have two buffers for writing also: the uncompressed buffer (mentioned
196
// above) and the compressed buffer.  When sending data to zlib we loop over
197
// the following until the source (uncompressed buffer or big write) is empty:
198
// - Is there no more space in the compressed buffer?
199
//   - Write the compressed buffer to the underlying transport.
200
// - Deflate from the source into the compressed buffer.
201
 
202
void TZlibTransport::write(const uint8_t* buf, uint32_t len) {
203
  // zlib's "deflate" function has enough logic in it that I think
204
  // we're better off (performance-wise) buffering up small writes.
205
  if ((int)len > MIN_DIRECT_DEFLATE_SIZE) {
206
    flushToZlib(uwbuf_, uwpos_);
207
    uwpos_ = 0;
208
    flushToZlib(buf, len);
209
  } else if (len > 0) {
210
    if (uwbuf_size_ - uwpos_ < (int)len) {
211
      flushToZlib(uwbuf_, uwpos_);
212
      uwpos_ = 0;
213
    }
214
    memcpy(uwbuf_ + uwpos_, buf, len);
215
    uwpos_ += len;
216
  }
217
}
218
 
219
void TZlibTransport::flush()  {
220
  flushToZlib(uwbuf_, uwpos_, true);
221
  assert((int)wstream_->avail_out != cwbuf_size_);
222
  transport_->write(cwbuf_, cwbuf_size_ - wstream_->avail_out);
223
  transport_->flush();
224
}
225
 
226
void TZlibTransport::flushToZlib(const uint8_t* buf, int len, bool finish) {
227
  int flush = (finish ? Z_FINISH : Z_NO_FLUSH);
228
 
229
  wstream_->next_in  = const_cast<uint8_t*>(buf);
230
  wstream_->avail_in = len;
231
 
232
  while (wstream_->avail_in > 0 || finish) {
233
    // If our ouput buffer is full, flush to the underlying transport.
234
    if (wstream_->avail_out == 0) {
235
      transport_->write(cwbuf_, cwbuf_size_);
236
      wstream_->next_out  = cwbuf_;
237
      wstream_->avail_out = cwbuf_size_;
238
    }
239
 
240
    int zlib_rv = deflate(wstream_, flush);
241
 
242
    if (finish && zlib_rv == Z_STREAM_END) {
243
      assert(wstream_->avail_in == 0);
244
      break;
245
    }
246
 
247
    checkZlibRv(zlib_rv, wstream_->msg);
248
  }
249
}
250
 
251
const uint8_t* TZlibTransport::borrow(uint8_t* buf, uint32_t* len) {
252
  // Don't try to be clever with shifting buffers.
253
  // If we have enough data, give a pointer to it,
254
  // otherwise let the protcol use its slow path.
255
  if (readAvail() >= (int)*len) {
256
    *len = (uint32_t)readAvail();
257
    return urbuf_ + urpos_;
258
  }
259
  return NULL;
260
}
261
 
262
void TZlibTransport::consume(uint32_t len) {
263
  if (readAvail() >= (int)len) {
264
    urpos_ += len;
265
  } else {
266
    throw TTransportException(TTransportException::BAD_ARGS,
267
                              "consume did not follow a borrow.");
268
  }
269
}
270
 
271
void TZlibTransport::verifyChecksum() {
272
  if (!standalone_) {
273
    throw TTransportException(
274
        TTransportException::BAD_ARGS,
275
        "TZLibTransport can only verify checksums for standalone objects.");
276
  }
277
 
278
  if (!input_ended_) {
279
    // This should only be called when reading is complete,
280
    // but it's possible that the whole checksum has not been fed to zlib yet.
281
    // We try to read an extra byte here to force zlib to finish the stream.
282
    // It might not always be easy to "unread" this byte,
283
    // but we throw an exception if we get it, which is not really
284
    // a recoverable error, so it doesn't matter.
285
    uint8_t buf[1];
286
    uint32_t got = this->read(buf, sizeof(buf));
287
    if (got || !input_ended_) {
288
      throw TTransportException(
289
          TTransportException::CORRUPTED_DATA,
290
          "Zlib stream not complete.");
291
    }
292
  }
293
 
294
  // If the checksum had been bad, we would have gotten an error while
295
  // inflating.
296
}
297
 
298
 
299
}}} // apache::thrift::transport