Subversion Repositories SmartDukaan

Rev

Go to most recent revision | Details | Last modification | View Log | RSS feed

Rev Author Line No. Line
30 ashish 1
/*
2
 * Licensed to the Apache Software Foundation (ASF) under one
3
 * or more contributor license agreements. See the NOTICE file
4
 * distributed with this work for additional information
5
 * regarding copyright ownership. The ASF licenses this file
6
 * to you under the Apache License, Version 2.0 (the
7
 * "License"); you may not use this file except in compliance
8
 * with the License. You may obtain a copy of the License at
9
 *
10
 *   http://www.apache.org/licenses/LICENSE-2.0
11
 *
12
 * Unless required by applicable law or agreed to in writing,
13
 * software distributed under the License is distributed on an
14
 * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
15
 * KIND, either express or implied. See the License for the
16
 * specific language governing permissions and limitations
17
 * under the License.
18
 */
19
 
20
#ifdef HAVE_CONFIG_H
21
#include "config.h"
22
#endif
23
 
24
#include "TFileTransport.h"
25
#include "TTransportUtils.h"
26
 
27
#include <pthread.h>
28
#ifdef HAVE_SYS_TIME_H
29
#include <sys/time.h>
30
#else
31
#include <time.h>
32
#endif
33
#include <fcntl.h>
34
#include <errno.h>
35
#include <unistd.h>
36
#ifdef HAVE_STRINGS_H
37
#include <strings.h>
38
#endif
39
#include <cstdlib>
40
#include <cstring>
41
#include <iostream>
42
#include <sys/stat.h>
43
 
44
namespace apache { namespace thrift { namespace transport {
45
 
46
using boost::shared_ptr;
47
using namespace std;
48
using namespace apache::thrift::protocol;
49
 
50
#ifndef HAVE_CLOCK_GETTIME
51
 
52
/**
53
 * Fake clock_gettime for systems like darwin
54
 *
55
 */
56
#define CLOCK_REALTIME 0
57
static int clock_gettime(int clk_id /*ignored*/, struct timespec *tp) {
58
  struct timeval now;
59
 
60
  int rv = gettimeofday(&now, NULL);
61
  if (rv != 0) {
62
    return rv;
63
  }
64
 
65
  tp->tv_sec = now.tv_sec;
66
  tp->tv_nsec = now.tv_usec * 1000;
67
  return 0;
68
}
69
#endif
70
 
71
TFileTransport::TFileTransport(string path, bool readOnly)
72
  : readState_()
73
  , readBuff_(NULL)
74
  , currentEvent_(NULL)
75
  , readBuffSize_(DEFAULT_READ_BUFF_SIZE)
76
  , readTimeout_(NO_TAIL_READ_TIMEOUT)
77
  , chunkSize_(DEFAULT_CHUNK_SIZE)
78
  , eventBufferSize_(DEFAULT_EVENT_BUFFER_SIZE)
79
  , flushMaxUs_(DEFAULT_FLUSH_MAX_US)
80
  , flushMaxBytes_(DEFAULT_FLUSH_MAX_BYTES)
81
  , maxEventSize_(DEFAULT_MAX_EVENT_SIZE)
82
  , maxCorruptedEvents_(DEFAULT_MAX_CORRUPTED_EVENTS)
83
  , eofSleepTime_(DEFAULT_EOF_SLEEP_TIME_US)
84
  , corruptedEventSleepTime_(DEFAULT_CORRUPTED_SLEEP_TIME_US)
85
  , writerThreadId_(0)
86
  , dequeueBuffer_(NULL)
87
  , enqueueBuffer_(NULL)
88
  , closing_(false)
89
  , forceFlush_(false)
90
  , filename_(path)
91
  , fd_(0)
92
  , bufferAndThreadInitialized_(false)
93
  , offset_(0)
94
  , lastBadChunk_(0)
95
  , numCorruptedEventsInChunk_(0)
96
  , readOnly_(readOnly)
97
{
98
  // initialize all the condition vars/mutexes
99
  pthread_mutex_init(&mutex_, NULL);
100
  pthread_cond_init(&notFull_, NULL);
101
  pthread_cond_init(&notEmpty_, NULL);
102
  pthread_cond_init(&flushed_, NULL);
103
 
104
  openLogFile();
105
}
106
 
107
void TFileTransport::resetOutputFile(int fd, string filename, int64_t offset) {
108
  filename_ = filename;
109
  offset_ = offset;
110
 
111
  // check if current file is still open
112
  if (fd_ > 0) {
113
    // flush any events in the queue
114
    flush();
115
    GlobalOutput.printf("error, current file (%s) not closed", filename_.c_str());
116
    if (-1 == ::close(fd_)) {
117
      int errno_copy = errno;
118
      GlobalOutput.perror("TFileTransport: resetOutputFile() ::close() ", errno_copy);
119
      throw TTransportException(TTransportException::UNKNOWN, "TFileTransport: error in file close", errno_copy);
120
    }
121
  }
122
 
123
  if (fd) {
124
    fd_ = fd;
125
  } else {
126
    // open file if the input fd is 0
127
    openLogFile();
128
  }
129
}
130
 
131
 
132
TFileTransport::~TFileTransport() {
133
  // flush the buffer if a writer thread is active
134
  if (writerThreadId_ > 0) {
135
    // reduce the flush timeout so that closing is quicker
136
    setFlushMaxUs(300*1000);
137
 
138
    // flush output buffer
139
    flush();
140
 
141
    // set state to closing
142
    closing_ = true;
143
 
144
    // TODO: make sure event queue is empty
145
    // currently only the write buffer is flushed
146
    // we dont actually wait until the queue is empty. This shouldn't be a big
147
    // deal in the common case because writing is quick
148
 
149
    pthread_join(writerThreadId_, NULL);
150
    writerThreadId_ = 0;
151
  }
152
 
153
  if (dequeueBuffer_) {
154
    delete dequeueBuffer_;
155
    dequeueBuffer_ = NULL;
156
  }
157
 
158
  if (enqueueBuffer_) {
159
    delete enqueueBuffer_;
160
    enqueueBuffer_ = NULL;
161
  }
162
 
163
  if (readBuff_) {
164
    delete[] readBuff_;
165
    readBuff_ = NULL;
166
  }
167
 
168
  if (currentEvent_) {
169
    delete currentEvent_;
170
    currentEvent_ = NULL;
171
  }
172
 
173
  // close logfile
174
  if (fd_ > 0) {
175
    if(-1 == ::close(fd_)) {
176
      GlobalOutput.perror("TFileTransport: ~TFileTransport() ::close() ", errno);
177
    }
178
  }
179
}
180
 
181
bool TFileTransport::initBufferAndWriteThread() {
182
  if (bufferAndThreadInitialized_) {
183
    T_ERROR("Trying to double-init TFileTransport");
184
    return false;
185
  }
186
 
187
  if (writerThreadId_ == 0) {
188
    if (pthread_create(&writerThreadId_, NULL, startWriterThread, (void *)this) != 0) {
189
      T_ERROR("Could not create writer thread");
190
      return false;
191
    }
192
  }
193
 
194
  dequeueBuffer_ = new TFileTransportBuffer(eventBufferSize_);
195
  enqueueBuffer_ = new TFileTransportBuffer(eventBufferSize_);
196
  bufferAndThreadInitialized_ = true;
197
 
198
  return true;
199
}
200
 
201
void TFileTransport::write(const uint8_t* buf, uint32_t len) {
202
  if (readOnly_) {
203
    throw TTransportException("TFileTransport: attempting to write to file opened readonly");
204
  }
205
 
206
  enqueueEvent(buf, len, false);
207
}
208
 
209
void TFileTransport::enqueueEvent(const uint8_t* buf, uint32_t eventLen, bool blockUntilFlush) {
210
  // can't enqueue more events if file is going to close
211
  if (closing_) {
212
    return;
213
  }
214
 
215
  // make sure that event size is valid
216
  if ( (maxEventSize_ > 0) && (eventLen > maxEventSize_) ) {
217
    T_ERROR("msg size is greater than max event size: %u > %u\n", eventLen, maxEventSize_);
218
    return;
219
  }
220
 
221
  if (eventLen == 0) {
222
    T_ERROR("cannot enqueue an empty event");
223
    return;
224
  }
225
 
226
  eventInfo* toEnqueue = new eventInfo();
227
  toEnqueue->eventBuff_ = (uint8_t *)std::malloc((sizeof(uint8_t) * eventLen) + 4);
228
  // first 4 bytes is the event length
229
  memcpy(toEnqueue->eventBuff_, (void*)(&eventLen), 4);
230
  // actual event contents
231
  memcpy(toEnqueue->eventBuff_ + 4, buf, eventLen);
232
  toEnqueue->eventSize_ = eventLen + 4;
233
 
234
  // lock mutex
235
  pthread_mutex_lock(&mutex_);
236
 
237
  // make sure that enqueue buffer is initialized and writer thread is running
238
  if (!bufferAndThreadInitialized_) {
239
    if (!initBufferAndWriteThread()) {
240
      delete toEnqueue;
241
      pthread_mutex_unlock(&mutex_);
242
      return;
243
    }
244
  }
245
 
246
  // Can't enqueue while buffer is full
247
  while (enqueueBuffer_->isFull()) {
248
    pthread_cond_wait(&notFull_, &mutex_);
249
  }
250
 
251
  // add to the buffer
252
  if (!enqueueBuffer_->addEvent(toEnqueue)) {
253
    delete toEnqueue;
254
    pthread_mutex_unlock(&mutex_);
255
    return;
256
  }
257
 
258
  // signal anybody who's waiting for the buffer to be non-empty
259
  pthread_cond_signal(&notEmpty_);
260
 
261
  if (blockUntilFlush) {
262
    pthread_cond_wait(&flushed_, &mutex_);
263
  }
264
 
265
  // this really should be a loop where it makes sure it got flushed
266
  // because condition variables can get triggered by the os for no reason
267
  // it is probably a non-factor for the time being
268
  pthread_mutex_unlock(&mutex_);
269
}
270
 
271
bool TFileTransport::swapEventBuffers(struct timespec* deadline) {
272
  pthread_mutex_lock(&mutex_);
273
  if (deadline != NULL) {
274
    // if we were handed a deadline time struct, do a timed wait
275
    pthread_cond_timedwait(&notEmpty_, &mutex_, deadline);
276
  } else {
277
    // just wait until the buffer gets an item
278
    pthread_cond_wait(&notEmpty_, &mutex_);
279
  }
280
 
281
  bool swapped = false;
282
 
283
  // could be empty if we timed out
284
  if (!enqueueBuffer_->isEmpty()) {
285
    TFileTransportBuffer *temp = enqueueBuffer_;
286
    enqueueBuffer_ = dequeueBuffer_;
287
    dequeueBuffer_ = temp;
288
 
289
    swapped = true;
290
  }
291
 
292
  // unlock the mutex and signal if required
293
  pthread_mutex_unlock(&mutex_);
294
 
295
  if (swapped) {
296
    pthread_cond_signal(&notFull_);
297
  }
298
 
299
  return swapped;
300
}
301
 
302
 
303
void TFileTransport::writerThread() {
304
  // open file if it is not open
305
  if(!fd_) {
306
    openLogFile();
307
  }
308
 
309
  // set the offset to the correct value (EOF)
310
  try {
311
    seekToEnd();
312
  } catch (TException &te) {
313
  }
314
 
315
  // throw away any partial events
316
  offset_ += readState_.lastDispatchPtr_;
317
  ftruncate(fd_, offset_);
318
  readState_.resetAllValues();
319
 
320
  // Figure out the next time by which a flush must take place
321
 
322
  struct timespec ts_next_flush;
323
  getNextFlushTime(&ts_next_flush);
324
  uint32_t unflushed = 0;
325
 
326
  while(1) {
327
    // this will only be true when the destructor is being invoked
328
    if(closing_) {
329
      // empty out both the buffers
330
      if (enqueueBuffer_->isEmpty() && dequeueBuffer_->isEmpty()) {
331
        if (-1 == ::close(fd_)) {
332
          int errno_copy = errno;
333
          GlobalOutput.perror("TFileTransport: writerThread() ::close() ", errno_copy);
334
          throw TTransportException(TTransportException::UNKNOWN, "TFileTransport: error in file close", errno_copy);
335
        }
336
        // just be safe and sync to disk
337
        fsync(fd_);
338
        fd_ = 0;
339
        pthread_exit(NULL);
340
        return;
341
      }
342
    }
343
 
344
    if (swapEventBuffers(&ts_next_flush)) {
345
      eventInfo* outEvent;
346
      while (NULL != (outEvent = dequeueBuffer_->getNext())) {
347
        if (!outEvent) {
348
          T_DEBUG_L(1, "Got an empty event");
349
          return;
350
        }
351
 
352
        // sanity check on event
353
        if ((maxEventSize_ > 0) && (outEvent->eventSize_ > maxEventSize_)) {
354
          T_ERROR("msg size is greater than max event size: %u > %u\n", outEvent->eventSize_, maxEventSize_);
355
          continue;
356
        }
357
 
358
        // If chunking is required, then make sure that msg does not cross chunk boundary
359
        if ((outEvent->eventSize_ > 0) && (chunkSize_ != 0)) {
360
 
361
          // event size must be less than chunk size
362
          if(outEvent->eventSize_ > chunkSize_) {
363
            T_ERROR("TFileTransport: event size(%u) is greater than chunk size(%u): skipping event",
364
                  outEvent->eventSize_, chunkSize_);
365
            continue;
366
          }
367
 
368
          int64_t chunk1 = offset_/chunkSize_;
369
          int64_t chunk2 = (offset_ + outEvent->eventSize_ - 1)/chunkSize_;
370
 
371
          // if adding this event will cross a chunk boundary, pad the chunk with zeros
372
          if (chunk1 != chunk2) {
373
            // refetch the offset to keep in sync
374
            offset_ = lseek(fd_, 0, SEEK_CUR);
375
            int32_t padding = (int32_t)((offset_/chunkSize_ + 1)*chunkSize_ - offset_);
376
 
377
            uint8_t zeros[padding];
378
            bzero(zeros, padding);
379
            if (-1 == ::write(fd_, zeros, padding)) {
380
              int errno_copy = errno;
381
              GlobalOutput.perror("TFileTransport: writerThread() error while padding zeros ", errno_copy);
382
              throw TTransportException(TTransportException::UNKNOWN, "TFileTransport: error while padding zeros", errno_copy);
383
            }
384
            unflushed += padding;
385
            offset_ += padding;
386
          }
387
        }
388
 
389
        // write the dequeued event to the file
390
        if (outEvent->eventSize_ > 0) {
391
          if (-1 == ::write(fd_, outEvent->eventBuff_, outEvent->eventSize_)) {
392
            int errno_copy = errno;
393
            GlobalOutput.perror("TFileTransport: error while writing event ", errno_copy);
394
            throw TTransportException(TTransportException::UNKNOWN, "TFileTransport: error while writing event", errno_copy);
395
          }
396
 
397
          unflushed += outEvent->eventSize_;
398
          offset_ += outEvent->eventSize_;
399
        }
400
      }
401
      dequeueBuffer_->reset();
402
    }
403
 
404
    bool flushTimeElapsed = false;
405
    struct timespec current_time;
406
    clock_gettime(CLOCK_REALTIME, &current_time);
407
 
408
    if (current_time.tv_sec > ts_next_flush.tv_sec ||
409
        (current_time.tv_sec == ts_next_flush.tv_sec && current_time.tv_nsec > ts_next_flush.tv_nsec)) {
410
      flushTimeElapsed = true;
411
      getNextFlushTime(&ts_next_flush);
412
    }
413
 
414
    // couple of cases from which a flush could be triggered
415
    if ((flushTimeElapsed && unflushed > 0) ||
416
       unflushed > flushMaxBytes_ ||
417
       forceFlush_) {
418
 
419
      // sync (force flush) file to disk
420
      fsync(fd_);
421
      unflushed = 0;
422
 
423
      // notify anybody waiting for flush completion
424
      forceFlush_ = false;
425
      pthread_cond_broadcast(&flushed_);
426
    }
427
  }
428
}
429
 
430
void TFileTransport::flush() {
431
  // file must be open for writing for any flushing to take place
432
  if (writerThreadId_ <= 0) {
433
    return;
434
  }
435
  // wait for flush to take place
436
  pthread_mutex_lock(&mutex_);
437
 
438
  forceFlush_ = true;
439
 
440
  while (forceFlush_) {
441
    pthread_cond_wait(&flushed_, &mutex_);
442
  }
443
 
444
  pthread_mutex_unlock(&mutex_);
445
}
446
 
447
 
448
uint32_t TFileTransport::readAll(uint8_t* buf, uint32_t len) {
449
  uint32_t have = 0;
450
  uint32_t get = 0;
451
 
452
  while (have < len) {
453
    get = read(buf+have, len-have);
454
    if (get <= 0) {
455
      throw TEOFException();
456
    }
457
    have += get;
458
  }
459
 
460
  return have;
461
}
462
 
463
uint32_t TFileTransport::read(uint8_t* buf, uint32_t len) {
464
  // check if there an event is ready to be read
465
  if (!currentEvent_) {
466
    currentEvent_ = readEvent();
467
  }
468
 
469
  // did not manage to read an event from the file. This could have happened
470
  // if the timeout expired or there was some other error
471
  if (!currentEvent_) {
472
    return 0;
473
  }
474
 
475
  // read as much of the current event as possible
476
  int32_t remaining = currentEvent_->eventSize_ - currentEvent_->eventBuffPos_;
477
  if (remaining <= (int32_t)len) {
478
    // copy over anything thats remaining
479
    if (remaining > 0) {
480
      memcpy(buf,
481
             currentEvent_->eventBuff_ + currentEvent_->eventBuffPos_,
482
             remaining);
483
    }
484
    delete(currentEvent_);
485
    currentEvent_ = NULL;
486
    return remaining;
487
  }
488
 
489
  // read as much as possible
490
  memcpy(buf, currentEvent_->eventBuff_ + currentEvent_->eventBuffPos_, len);
491
  currentEvent_->eventBuffPos_ += len;
492
  return len;
493
}
494
 
495
eventInfo* TFileTransport::readEvent() {
496
  int readTries = 0;
497
 
498
  if (!readBuff_) {
499
    readBuff_ = new uint8_t[readBuffSize_];
500
  }
501
 
502
  while (1) {
503
    // read from the file if read buffer is exhausted
504
    if (readState_.bufferPtr_ == readState_.bufferLen_) {
505
      // advance the offset pointer
506
      offset_ += readState_.bufferLen_;
507
      readState_.bufferLen_ = ::read(fd_, readBuff_, readBuffSize_);
508
      //       if (readState_.bufferLen_) {
509
      //         T_DEBUG_L(1, "Amount read: %u (offset: %lu)", readState_.bufferLen_, offset_);
510
      //       }
511
      readState_.bufferPtr_ = 0;
512
      readState_.lastDispatchPtr_ = 0;
513
 
514
      // read error
515
      if (readState_.bufferLen_ == -1) {
516
        readState_.resetAllValues();
517
        GlobalOutput("TFileTransport: error while reading from file");
518
        throw TTransportException("TFileTransport: error while reading from file");
519
      } else if (readState_.bufferLen_ == 0) {  // EOF
520
        // wait indefinitely if there is no timeout
521
        if (readTimeout_ == TAIL_READ_TIMEOUT) {
522
          usleep(eofSleepTime_);
523
          continue;
524
        } else if (readTimeout_ == NO_TAIL_READ_TIMEOUT) {
525
          // reset state
526
          readState_.resetState(0);
527
          return NULL;
528
        } else if (readTimeout_ > 0) {
529
          // timeout already expired once
530
          if (readTries > 0) {
531
            readState_.resetState(0);
532
            return NULL;
533
          } else {
534
            usleep(readTimeout_ * 1000);
535
            readTries++;
536
            continue;
537
          }
538
        }
539
      }
540
    }
541
 
542
    readTries = 0;
543
 
544
    // attempt to read an event from the buffer
545
    while(readState_.bufferPtr_ < readState_.bufferLen_) {
546
      if (readState_.readingSize_) {
547
        if(readState_.eventSizeBuffPos_ == 0) {
548
          if ( (offset_ + readState_.bufferPtr_)/chunkSize_ !=
549
               ((offset_ + readState_.bufferPtr_ + 3)/chunkSize_)) {
550
            // skip one byte towards chunk boundary
551
            //            T_DEBUG_L(1, "Skipping a byte");
552
            readState_.bufferPtr_++;
553
            continue;
554
          }
555
        }
556
 
557
        readState_.eventSizeBuff_[readState_.eventSizeBuffPos_++] =
558
          readBuff_[readState_.bufferPtr_++];
559
        if (readState_.eventSizeBuffPos_ == 4) {
560
          // 0 length event indicates padding
561
          if (*((uint32_t *)(readState_.eventSizeBuff_)) == 0) {
562
            //            T_DEBUG_L(1, "Got padding");
563
            readState_.resetState(readState_.lastDispatchPtr_);
564
            continue;
565
          }
566
          // got a valid event
567
          readState_.readingSize_ = false;
568
          if (readState_.event_) {
569
            delete(readState_.event_);
570
          }
571
          readState_.event_ = new eventInfo();
572
          readState_.event_->eventSize_ = *((uint32_t *)(readState_.eventSizeBuff_));
573
 
574
          // check if the event is corrupted and perform recovery if required
575
          if (isEventCorrupted()) {
576
            performRecovery();
577
            // start from the top
578
            break;
579
          }
580
        }
581
      } else {
582
        if (!readState_.event_->eventBuff_) {
583
          readState_.event_->eventBuff_ = new uint8_t[readState_.event_->eventSize_];
584
          readState_.event_->eventBuffPos_ = 0;
585
        }
586
        // take either the entire event or the remaining bytes in the buffer
587
        int reclaimBuffer = min((uint32_t)(readState_.bufferLen_ - readState_.bufferPtr_),
588
                                readState_.event_->eventSize_ - readState_.event_->eventBuffPos_);
589
 
590
        // copy data from read buffer into event buffer
591
        memcpy(readState_.event_->eventBuff_ + readState_.event_->eventBuffPos_,
592
               readBuff_ + readState_.bufferPtr_,
593
               reclaimBuffer);
594
 
595
        // increment position ptrs
596
        readState_.event_->eventBuffPos_ += reclaimBuffer;
597
        readState_.bufferPtr_ += reclaimBuffer;
598
 
599
        // check if the event has been read in full
600
        if (readState_.event_->eventBuffPos_ == readState_.event_->eventSize_) {
601
          // set the completed event to the current event
602
          eventInfo* completeEvent = readState_.event_;
603
          completeEvent->eventBuffPos_ = 0;
604
 
605
          readState_.event_ = NULL;
606
          readState_.resetState(readState_.bufferPtr_);
607
 
608
          // exit criteria
609
          return completeEvent;
610
        }
611
      }
612
    }
613
 
614
  }
615
}
616
 
617
bool TFileTransport::isEventCorrupted() {
618
  // an error is triggered if:
619
  if ( (maxEventSize_ > 0) &&  (readState_.event_->eventSize_ > maxEventSize_)) {
620
    // 1. Event size is larger than user-speficied max-event size
621
    T_ERROR("Read corrupt event. Event size(%u) greater than max event size (%u)",
622
            readState_.event_->eventSize_, maxEventSize_);
623
    return true;
624
  } else if (readState_.event_->eventSize_ > chunkSize_) {
625
    // 2. Event size is larger than chunk size
626
    T_ERROR("Read corrupt event. Event size(%u) greater than chunk size (%u)",
627
               readState_.event_->eventSize_, chunkSize_);
628
    return true;
629
  } else if( ((offset_ + readState_.bufferPtr_ - 4)/chunkSize_) !=
630
             ((offset_ + readState_.bufferPtr_ + readState_.event_->eventSize_ - 1)/chunkSize_) ) {
631
    // 3. size indicates that event crosses chunk boundary
632
    T_ERROR("Read corrupt event. Event crosses chunk boundary. Event size:%u  Offset:%ld",
633
            readState_.event_->eventSize_, offset_ + readState_.bufferPtr_ + 4);
634
    return true;
635
  }
636
 
637
  return false;
638
}
639
 
640
void TFileTransport::performRecovery() {
641
  // perform some kickass recovery
642
  uint32_t curChunk = getCurChunk();
643
  if (lastBadChunk_ == curChunk) {
644
    numCorruptedEventsInChunk_++;
645
  } else {
646
    lastBadChunk_ = curChunk;
647
    numCorruptedEventsInChunk_ = 1;
648
  }
649
 
650
  if (numCorruptedEventsInChunk_ < maxCorruptedEvents_) {
651
    // maybe there was an error in reading the file from disk
652
    // seek to the beginning of chunk and try again
653
    seekToChunk(curChunk);
654
  } else {
655
 
656
    // just skip ahead to the next chunk if we not already at the last chunk
657
    if (curChunk != (getNumChunks() - 1)) {
658
      seekToChunk(curChunk + 1);
659
    } else if (readTimeout_ == TAIL_READ_TIMEOUT) {
660
      // if tailing the file, wait until there is enough data to start
661
      // the next chunk
662
      while(curChunk == (getNumChunks() - 1)) {
663
        usleep(DEFAULT_CORRUPTED_SLEEP_TIME_US);
664
      }
665
      seekToChunk(curChunk + 1);
666
    } else {
667
      // pretty hosed at this stage, rewind the file back to the last successful
668
      // point and punt on the error
669
      readState_.resetState(readState_.lastDispatchPtr_);
670
      currentEvent_ = NULL;
671
      char errorMsg[1024];
672
      sprintf(errorMsg, "TFileTransport: log file corrupted at offset: %lu",
673
              offset_ + readState_.lastDispatchPtr_);
674
      GlobalOutput(errorMsg);
675
      throw TTransportException(errorMsg);
676
    }
677
  }
678
 
679
}
680
 
681
void TFileTransport::seekToChunk(int32_t chunk) {
682
  if (fd_ <= 0) {
683
    throw TTransportException("File not open");
684
  }
685
 
686
  int32_t numChunks = getNumChunks();
687
 
688
  // file is empty, seeking to chunk is pointless
689
  if (numChunks == 0) {
690
    return;
691
  }
692
 
693
  // negative indicates reverse seek (from the end)
694
  if (chunk < 0) {
695
    chunk += numChunks;
696
  }
697
 
698
  // too large a value for reverse seek, just seek to beginning
699
  if (chunk < 0) {
700
    T_DEBUG("Incorrect value for reverse seek. Seeking to beginning...", chunk)
701
    chunk = 0;
702
  }
703
 
704
  // cannot seek past EOF
705
  bool seekToEnd = false;
706
  uint32_t minEndOffset = 0;
707
  if (chunk >= numChunks) {
708
    T_DEBUG("Trying to seek past EOF. Seeking to EOF instead...");
709
    seekToEnd = true;
710
    chunk = numChunks - 1;
711
    // this is the min offset to process events till
712
    minEndOffset = lseek(fd_, 0, SEEK_END);
713
  }
714
 
715
  off_t newOffset = off_t(chunk) * chunkSize_;
716
  offset_ = lseek(fd_, newOffset, SEEK_SET);
717
  readState_.resetAllValues();
718
  currentEvent_ = NULL;
719
  if (offset_ == -1) {
720
    GlobalOutput("TFileTransport: lseek error in seekToChunk");
721
    throw TTransportException("TFileTransport: lseek error in seekToChunk");
722
  }
723
 
724
  // seek to EOF if user wanted to go to last chunk
725
  if (seekToEnd) {
726
    uint32_t oldReadTimeout = getReadTimeout();
727
    setReadTimeout(NO_TAIL_READ_TIMEOUT);
728
    // keep on reading unti the last event at point of seekChunk call
729
    while (readEvent() && ((offset_ + readState_.bufferPtr_) < minEndOffset)) {};
730
    setReadTimeout(oldReadTimeout);
731
  }
732
 
733
}
734
 
735
void TFileTransport::seekToEnd() {
736
  seekToChunk(getNumChunks());
737
}
738
 
739
uint32_t TFileTransport::getNumChunks() {
740
  if (fd_ <= 0) {
741
    return 0;
742
  }
743
 
744
  struct stat f_info;
745
  int rv = fstat(fd_, &f_info);
746
 
747
  if (rv < 0) {
748
    int errno_copy = errno;
749
    throw TTransportException(TTransportException::UNKNOWN,
750
                              "TFileTransport::getNumChunks() (fstat)",
751
                              errno_copy);
752
  }
753
 
754
  if (f_info.st_size > 0) {
755
    return ((f_info.st_size)/chunkSize_) + 1;
756
  }
757
 
758
  // empty file has no chunks
759
  return 0;
760
}
761
 
762
uint32_t TFileTransport::getCurChunk() {
763
  return offset_/chunkSize_;
764
}
765
 
766
// Utility Functions
767
void TFileTransport::openLogFile() {
768
  mode_t mode = readOnly_ ? S_IRUSR | S_IRGRP | S_IROTH : S_IRUSR | S_IWUSR| S_IRGRP | S_IROTH;
769
  int flags = readOnly_ ? O_RDONLY : O_RDWR | O_CREAT | O_APPEND;
770
  fd_ = ::open(filename_.c_str(), flags, mode);
771
  offset_ = 0;
772
 
773
  // make sure open call was successful
774
  if(fd_ == -1) {
775
    int errno_copy = errno;
776
    GlobalOutput.perror("TFileTransport: openLogFile() ::open() file: " + filename_, errno_copy);
777
    throw TTransportException(TTransportException::NOT_OPEN, filename_, errno_copy);
778
  }
779
 
780
}
781
 
782
void TFileTransport::getNextFlushTime(struct timespec* ts_next_flush) {
783
  clock_gettime(CLOCK_REALTIME, ts_next_flush);
784
  ts_next_flush->tv_nsec += (flushMaxUs_ % 1000000) * 1000;
785
  if (ts_next_flush->tv_nsec > 1000000000) {
786
    ts_next_flush->tv_nsec -= 1000000000;
787
    ts_next_flush->tv_sec += 1;
788
  }
789
  ts_next_flush->tv_sec += flushMaxUs_ / 1000000;
790
}
791
 
792
TFileTransportBuffer::TFileTransportBuffer(uint32_t size)
793
  : bufferMode_(WRITE)
794
  , writePoint_(0)
795
  , readPoint_(0)
796
  , size_(size)
797
{
798
  buffer_ = new eventInfo*[size];
799
}
800
 
801
TFileTransportBuffer::~TFileTransportBuffer() {
802
  if (buffer_) {
803
    for (uint32_t i = 0; i < writePoint_; i++) {
804
      delete buffer_[i];
805
    }
806
    delete[] buffer_;
807
    buffer_ = NULL;
808
  }
809
}
810
 
811
bool TFileTransportBuffer::addEvent(eventInfo *event) {
812
  if (bufferMode_ == READ) {
813
    GlobalOutput("Trying to write to a buffer in read mode");
814
  }
815
  if (writePoint_ < size_) {
816
    buffer_[writePoint_++] = event;
817
    return true;
818
  } else {
819
    // buffer is full
820
    return false;
821
  }
822
}
823
 
824
eventInfo* TFileTransportBuffer::getNext() {
825
  if (bufferMode_ == WRITE) {
826
    bufferMode_ = READ;
827
  }
828
  if (readPoint_ < writePoint_) {
829
    return buffer_[readPoint_++];
830
  } else {
831
    // no more entries
832
    return NULL;
833
  }
834
}
835
 
836
void TFileTransportBuffer::reset() {
837
  if (bufferMode_ == WRITE || writePoint_ > readPoint_) {
838
    T_DEBUG("Resetting a buffer with unread entries");
839
  }
840
  // Clean up the old entries
841
  for (uint32_t i = 0; i < writePoint_; i++) {
842
    delete buffer_[i];
843
  }
844
  bufferMode_ = WRITE;
845
  writePoint_ = 0;
846
  readPoint_ = 0;
847
}
848
 
849
bool TFileTransportBuffer::isFull() {
850
  return writePoint_ == size_;
851
}
852
 
853
bool TFileTransportBuffer::isEmpty() {
854
  return writePoint_ == 0;
855
}
856
 
857
TFileProcessor::TFileProcessor(shared_ptr<TProcessor> processor,
858
                               shared_ptr<TProtocolFactory> protocolFactory,
859
                               shared_ptr<TFileReaderTransport> inputTransport):
860
  processor_(processor),
861
  inputProtocolFactory_(protocolFactory),
862
  outputProtocolFactory_(protocolFactory),
863
  inputTransport_(inputTransport) {
864
 
865
  // default the output transport to a null transport (common case)
866
  outputTransport_ = shared_ptr<TNullTransport>(new TNullTransport());
867
}
868
 
869
TFileProcessor::TFileProcessor(shared_ptr<TProcessor> processor,
870
                               shared_ptr<TProtocolFactory> inputProtocolFactory,
871
                               shared_ptr<TProtocolFactory> outputProtocolFactory,
872
                               shared_ptr<TFileReaderTransport> inputTransport):
873
  processor_(processor),
874
  inputProtocolFactory_(inputProtocolFactory),
875
  outputProtocolFactory_(outputProtocolFactory),
876
  inputTransport_(inputTransport) {
877
 
878
  // default the output transport to a null transport (common case)
879
  outputTransport_ = shared_ptr<TNullTransport>(new TNullTransport());
880
}
881
 
882
TFileProcessor::TFileProcessor(shared_ptr<TProcessor> processor,
883
                               shared_ptr<TProtocolFactory> protocolFactory,
884
                               shared_ptr<TFileReaderTransport> inputTransport,
885
                               shared_ptr<TTransport> outputTransport):
886
  processor_(processor),
887
  inputProtocolFactory_(protocolFactory),
888
  outputProtocolFactory_(protocolFactory),
889
  inputTransport_(inputTransport),
890
  outputTransport_(outputTransport) {};
891
 
892
void TFileProcessor::process(uint32_t numEvents, bool tail) {
893
  shared_ptr<TProtocol> inputProtocol = inputProtocolFactory_->getProtocol(inputTransport_);
894
  shared_ptr<TProtocol> outputProtocol = outputProtocolFactory_->getProtocol(outputTransport_);
895
 
896
  // set the read timeout to 0 if tailing is required
897
  int32_t oldReadTimeout = inputTransport_->getReadTimeout();
898
  if (tail) {
899
    // save old read timeout so it can be restored
900
    inputTransport_->setReadTimeout(TFileTransport::TAIL_READ_TIMEOUT);
901
  }
902
 
903
  uint32_t numProcessed = 0;
904
  while(1) {
905
    // bad form to use exceptions for flow control but there is really
906
    // no other way around it
907
    try {
908
      processor_->process(inputProtocol, outputProtocol);
909
      numProcessed++;
910
      if ( (numEvents > 0) && (numProcessed == numEvents)) {
911
        return;
912
      }
913
    } catch (TEOFException& teof) {
914
      if (!tail) {
915
        break;
916
      }
917
    } catch (TException &te) {
918
      cerr << te.what() << endl;
919
      break;
920
    }
921
  }
922
 
923
  // restore old read timeout
924
  if (tail) {
925
    inputTransport_->setReadTimeout(oldReadTimeout);
926
  }
927
 
928
}
929
 
930
void TFileProcessor::processChunk() {
931
  shared_ptr<TProtocol> inputProtocol = inputProtocolFactory_->getProtocol(inputTransport_);
932
  shared_ptr<TProtocol> outputProtocol = outputProtocolFactory_->getProtocol(outputTransport_);
933
 
934
  uint32_t curChunk = inputTransport_->getCurChunk();
935
 
936
  while(1) {
937
    // bad form to use exceptions for flow control but there is really
938
    // no other way around it
939
    try {
940
      processor_->process(inputProtocol, outputProtocol);
941
      if (curChunk != inputTransport_->getCurChunk()) {
942
        break;
943
      }
944
    } catch (TEOFException& teof) {
945
      break;
946
    } catch (TException &te) {
947
      cerr << te.what() << endl;
948
      break;
949
    }
950
  }
951
}
952
 
953
}}} // apache::thrift::transport