Subversion Repositories SmartDukaan

Rev

Rev 30 | Details | Compare with Previous | Last modification | View Log | RSS feed

Rev Author Line No. Line
30 ashish 1
/*
2
 * Licensed to the Apache Software Foundation (ASF) under one
3
 * or more contributor license agreements. See the NOTICE file
4
 * distributed with this work for additional information
5
 * regarding copyright ownership. The ASF licenses this file
6
 * to you under the Apache License, Version 2.0 (the
7
 * "License"); you may not use this file except in compliance
8
 * with the License. You may obtain a copy of the License at
9
 *
10
 *   http://www.apache.org/licenses/LICENSE-2.0
11
 *
12
 * Unless required by applicable law or agreed to in writing,
13
 * software distributed under the License is distributed on an
14
 * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
15
 * KIND, either express or implied. See the License for the
16
 * specific language governing permissions and limitations
17
 * under the License.
18
 */
19
 
20
#ifndef _THRIFT_TRANSPORT_TFILETRANSPORT_H_
21
#define _THRIFT_TRANSPORT_TFILETRANSPORT_H_ 1
22
 
23
#include "TTransport.h"
24
#include "Thrift.h"
25
#include "TProcessor.h"
26
 
27
#include <string>
28
#include <stdio.h>
29
 
30
#include <pthread.h>
31
 
32
#include <boost/shared_ptr.hpp>
33
 
34
namespace apache { namespace thrift { namespace transport {
35
 
36
using apache::thrift::TProcessor;
37
using apache::thrift::protocol::TProtocolFactory;
38
 
39
// Data pertaining to a single event
40
typedef struct eventInfo {
41
  uint8_t* eventBuff_;
42
  uint32_t eventSize_;
43
  uint32_t eventBuffPos_;
44
 
45
  eventInfo():eventBuff_(NULL), eventSize_(0), eventBuffPos_(0){};
46
  ~eventInfo() {
47
    if (eventBuff_) {
48
      delete[] eventBuff_;
49
    }
50
  }
51
} eventInfo;
52
 
53
// information about current read state
54
typedef struct readState {
55
  eventInfo* event_;
56
 
57
  // keep track of event size
58
  uint8_t   eventSizeBuff_[4];
59
  uint8_t   eventSizeBuffPos_;
60
  bool      readingSize_;
61
 
62
  // read buffer variables
63
  int32_t  bufferPtr_;
64
  int32_t  bufferLen_;
65
 
66
  // last successful dispatch point
67
  int32_t lastDispatchPtr_;
68
 
69
  void resetState(uint32_t lastDispatchPtr) {
70
    readingSize_ = true;
71
    eventSizeBuffPos_ = 0;
72
    lastDispatchPtr_ = lastDispatchPtr;
73
  }
74
 
75
  void resetAllValues() {
76
    resetState(0);
77
    bufferPtr_ = 0;
78
    bufferLen_ = 0;
79
    if (event_) {
80
      delete(event_);
81
    }
82
    event_ = 0;
83
  }
84
 
85
  readState() {
86
    event_ = 0;
87
   resetAllValues();
88
  }
89
 
90
  ~readState() {
91
    if (event_) {
92
      delete(event_);
93
    }
94
  }
95
 
96
} readState;
97
 
98
/**
99
 * TFileTransportBuffer - buffer class used by TFileTransport for queueing up events
100
 * to be written to disk.  Should be used in the following way:
101
 *  1) Buffer created
102
 *  2) Buffer written to (addEvent)
103
 *  3) Buffer read from (getNext)
104
 *  4) Buffer reset (reset)
105
 *  5) Go back to 2, or destroy buffer
106
 *
107
 * The buffer should never be written to after it is read from, unless it is reset first.
108
 * Note: The above rules are enforced mainly for debugging its sole client TFileTransport
109
 *       which uses the buffer in this way.
110
 *
111
 */
112
class TFileTransportBuffer {
113
  public:
114
    TFileTransportBuffer(uint32_t size);
115
    ~TFileTransportBuffer();
116
 
117
    bool addEvent(eventInfo *event);
118
    eventInfo* getNext();
119
    void reset();
120
    bool isFull();
121
    bool isEmpty();
122
 
123
  private:
124
    TFileTransportBuffer(); // should not be used
125
 
126
    enum mode {
127
      WRITE,
128
      READ
129
    };
130
    mode bufferMode_;
131
 
132
    uint32_t writePoint_;
133
    uint32_t readPoint_;
134
    uint32_t size_;
135
    eventInfo** buffer_;
136
};
137
 
138
/**
139
 * Abstract interface for transports used to read files
140
 */
141
class TFileReaderTransport : virtual public TTransport {
142
 public:
143
  virtual int32_t getReadTimeout() = 0;
144
  virtual void setReadTimeout(int32_t readTimeout) = 0;
145
 
146
  virtual uint32_t getNumChunks() = 0;
147
  virtual uint32_t getCurChunk() = 0;
148
  virtual void seekToChunk(int32_t chunk) = 0;
149
  virtual void seekToEnd() = 0;
150
};
151
 
152
/**
153
 * Abstract interface for transports used to write files
154
 */
155
class TFileWriterTransport : virtual public TTransport {
156
 public:
157
  virtual uint32_t getChunkSize() = 0;
158
  virtual void setChunkSize(uint32_t chunkSize) = 0;
159
};
160
 
161
/**
162
 * File implementation of a transport. Reads and writes are done to a
163
 * file on disk.
164
 *
165
 */
166
class TFileTransport : public TFileReaderTransport,
167
                       public TFileWriterTransport {
168
 public:
169
  TFileTransport(std::string path, bool readOnly=false);
170
  ~TFileTransport();
171
 
172
  // TODO: what is the correct behaviour for this?
173
  // the log file is generally always open
174
  bool isOpen() {
175
    return true;
176
  }
177
 
178
  void write(const uint8_t* buf, uint32_t len);
179
  void flush();
180
 
181
  uint32_t readAll(uint8_t* buf, uint32_t len);
182
  uint32_t read(uint8_t* buf, uint32_t len);
183
 
184
  // log-file specific functions
185
  void seekToChunk(int32_t chunk);
186
  void seekToEnd();
187
  uint32_t getNumChunks();
188
  uint32_t getCurChunk();
189
 
190
  // for changing the output file
191
  void resetOutputFile(int fd, std::string filename, int64_t offset);
192
 
193
  // Setter/Getter functions for user-controllable options
194
  void setReadBuffSize(uint32_t readBuffSize) {
195
    if (readBuffSize) {
196
      readBuffSize_ = readBuffSize;
197
    }
198
  }
199
  uint32_t getReadBuffSize() {
200
    return readBuffSize_;
201
  }
202
 
203
  static const int32_t TAIL_READ_TIMEOUT = -1;
204
  static const int32_t NO_TAIL_READ_TIMEOUT = 0;
205
  void setReadTimeout(int32_t readTimeout) {
206
    readTimeout_ = readTimeout;
207
  }
208
  int32_t getReadTimeout() {
209
    return readTimeout_;
210
  }
211
 
212
  void setChunkSize(uint32_t chunkSize) {
213
    if (chunkSize) {
214
      chunkSize_ = chunkSize;
215
    }
216
  }
217
  uint32_t getChunkSize() {
218
    return chunkSize_;
219
  }
220
 
221
  void setEventBufferSize(uint32_t bufferSize) {
222
    if (bufferAndThreadInitialized_) {
223
      GlobalOutput("Cannot change the buffer size after writer thread started");
224
      return;
225
    }
226
    eventBufferSize_ = bufferSize;
227
  }
228
 
229
  uint32_t getEventBufferSize() {
230
    return eventBufferSize_;
231
  }
232
 
233
  void setFlushMaxUs(uint32_t flushMaxUs) {
234
    if (flushMaxUs) {
235
      flushMaxUs_ = flushMaxUs;
236
    }
237
  }
238
  uint32_t getFlushMaxUs() {
239
    return flushMaxUs_;
240
  }
241
 
242
  void setFlushMaxBytes(uint32_t flushMaxBytes) {
243
    if (flushMaxBytes) {
244
      flushMaxBytes_ = flushMaxBytes;
245
    }
246
  }
247
  uint32_t getFlushMaxBytes() {
248
    return flushMaxBytes_;
249
  }
250
 
251
  void setMaxEventSize(uint32_t maxEventSize) {
252
    maxEventSize_ = maxEventSize;
253
  }
254
  uint32_t getMaxEventSize() {
255
    return maxEventSize_;
256
  }
257
 
258
  void setMaxCorruptedEvents(uint32_t maxCorruptedEvents) {
259
    maxCorruptedEvents_ = maxCorruptedEvents;
260
  }
261
  uint32_t getMaxCorruptedEvents() {
262
    return maxCorruptedEvents_;
263
  }
264
 
265
  void setEofSleepTimeUs(uint32_t eofSleepTime) {
266
    if (eofSleepTime) {
267
      eofSleepTime_ = eofSleepTime;
268
    }
269
  }
270
  uint32_t getEofSleepTimeUs() {
271
    return eofSleepTime_;
272
  }
273
 
274
 private:
275
  // helper functions for writing to a file
276
  void enqueueEvent(const uint8_t* buf, uint32_t eventLen, bool blockUntilFlush);
277
  bool swapEventBuffers(struct timespec* deadline);
278
  bool initBufferAndWriteThread();
279
 
280
  // control for writer thread
281
  static void* startWriterThread(void* ptr) {
282
    (((TFileTransport*)ptr)->writerThread());
283
    return 0;
284
  }
285
  void writerThread();
286
 
287
  // helper functions for reading from a file
288
  eventInfo* readEvent();
289
 
290
  // event corruption-related functions
291
  bool isEventCorrupted();
292
  void performRecovery();
293
 
294
  // Utility functions
295
  void openLogFile();
296
  void getNextFlushTime(struct timespec* ts_next_flush);
297
 
298
  // Class variables
299
  readState readState_;
300
  uint8_t* readBuff_;
301
  eventInfo* currentEvent_;
302
 
303
  uint32_t readBuffSize_;
304
  static const uint32_t DEFAULT_READ_BUFF_SIZE = 1 * 1024 * 1024;
305
 
306
  int32_t readTimeout_;
307
  static const int32_t DEFAULT_READ_TIMEOUT_MS = 200;
308
 
309
  // size of chunks that file will be split up into
310
  uint32_t chunkSize_;
311
  static const uint32_t DEFAULT_CHUNK_SIZE = 16 * 1024 * 1024;
312
 
313
  // size of event buffers
314
  uint32_t eventBufferSize_;
315
  static const uint32_t DEFAULT_EVENT_BUFFER_SIZE = 10000;
316
 
317
  // max number of microseconds that can pass without flushing
318
  uint32_t flushMaxUs_;
319
  static const uint32_t DEFAULT_FLUSH_MAX_US = 3000000;
320
 
321
  // max number of bytes that can be written without flushing
322
  uint32_t flushMaxBytes_;
323
  static const uint32_t DEFAULT_FLUSH_MAX_BYTES = 1000 * 1024;
324
 
325
  // max event size
326
  uint32_t maxEventSize_;
327
  static const uint32_t DEFAULT_MAX_EVENT_SIZE = 0;
328
 
329
  // max number of corrupted events per chunk
330
  uint32_t maxCorruptedEvents_;
331
  static const uint32_t DEFAULT_MAX_CORRUPTED_EVENTS = 0;
332
 
333
  // sleep duration when EOF is hit
334
  uint32_t eofSleepTime_;
335
  static const uint32_t DEFAULT_EOF_SLEEP_TIME_US = 500 * 1000;
336
 
337
  // sleep duration when a corrupted event is encountered
338
  uint32_t corruptedEventSleepTime_;
339
  static const uint32_t DEFAULT_CORRUPTED_SLEEP_TIME_US = 1 * 1000 * 1000;
340
 
341
  // writer thread id
342
  pthread_t writerThreadId_;
343
 
344
  // buffers to hold data before it is flushed. Each element of the buffer stores a msg that
345
  // needs to be written to the file.  The buffers are swapped by the writer thread.
346
  TFileTransportBuffer *dequeueBuffer_;
347
  TFileTransportBuffer *enqueueBuffer_;
348
 
349
  // conditions used to block when the buffer is full or empty
350
  pthread_cond_t notFull_, notEmpty_;
351
  volatile bool closing_;
352
 
353
  // To keep track of whether the buffer has been flushed
354
  pthread_cond_t flushed_;
355
  volatile bool forceFlush_;
356
 
357
  // Mutex that is grabbed when enqueueing and swapping the read/write buffers
358
  pthread_mutex_t mutex_;
359
 
360
  // File information
361
  std::string filename_;
362
  int fd_;
363
 
364
  // Whether the writer thread and buffers have been initialized
365
  bool bufferAndThreadInitialized_;
366
 
367
  // Offset within the file
368
  off_t offset_;
369
 
370
  // event corruption information
371
  uint32_t lastBadChunk_;
372
  uint32_t numCorruptedEventsInChunk_;
373
 
374
  bool readOnly_;
375
};
376
 
377
// Exception thrown when EOF is hit
378
class TEOFException : public TTransportException {
379
 public:
380
  TEOFException():
381
    TTransportException(TTransportException::END_OF_FILE) {};
382
};
383
 
384
 
385
// wrapper class to process events from a file containing thrift events
386
class TFileProcessor {
387
 public:
388
  /**
389
   * Constructor that defaults output transport to null transport
390
   *
391
   * @param processor processes log-file events
392
   * @param protocolFactory protocol factory
393
   * @param inputTransport file transport
394
   */
395
  TFileProcessor(boost::shared_ptr<TProcessor> processor,
396
                 boost::shared_ptr<TProtocolFactory> protocolFactory,
397
                 boost::shared_ptr<TFileReaderTransport> inputTransport);
398
 
399
  TFileProcessor(boost::shared_ptr<TProcessor> processor,
400
                 boost::shared_ptr<TProtocolFactory> inputProtocolFactory,
401
                 boost::shared_ptr<TProtocolFactory> outputProtocolFactory,
402
                 boost::shared_ptr<TFileReaderTransport> inputTransport);
403
 
404
  /**
405
   * Constructor
406
   *
407
   * @param processor processes log-file events
408
   * @param protocolFactory protocol factory
409
   * @param inputTransport input file transport
410
   * @param output output transport
411
   */
412
  TFileProcessor(boost::shared_ptr<TProcessor> processor,
413
                 boost::shared_ptr<TProtocolFactory> protocolFactory,
414
                 boost::shared_ptr<TFileReaderTransport> inputTransport,
415
                 boost::shared_ptr<TTransport> outputTransport);
416
 
417
  /**
418
   * processes events from the file
419
   *
420
   * @param numEvents number of events to process (0 for unlimited)
421
   * @param tail tails the file if true
422
   */
423
  void process(uint32_t numEvents, bool tail);
424
 
425
  /**
426
   * process events until the end of the chunk
427
   *
428
   */
429
  void processChunk();
430
 
431
 private:
432
  boost::shared_ptr<TProcessor> processor_;
433
  boost::shared_ptr<TProtocolFactory> inputProtocolFactory_;
434
  boost::shared_ptr<TProtocolFactory> outputProtocolFactory_;
435
  boost::shared_ptr<TFileReaderTransport> inputTransport_;
436
  boost::shared_ptr<TTransport> outputTransport_;
437
};
438
 
439
 
440
}}} // apache::thrift::transport
441
 
442
#endif // _THRIFT_TRANSPORT_TFILETRANSPORT_H_