Rev 30 | Blame | Compare with Previous | Last modification | View Log | RSS feed
/** Licensed to the Apache Software Foundation (ASF) under one* or more contributor license agreements. See the NOTICE file* distributed with this work for additional information* regarding copyright ownership. The ASF licenses this file* to you under the Apache License, Version 2.0 (the* "License"); you may not use this file except in compliance* with the License. You may obtain a copy of the License at** http://www.apache.org/licenses/LICENSE-2.0** Unless required by applicable law or agreed to in writing,* software distributed under the License is distributed on an* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY* KIND, either express or implied. See the License for the* specific language governing permissions and limitations* under the License.*/#ifdef HAVE_CONFIG_H#include "config.h"#endif#include "TFileTransport.h"#include "TTransportUtils.h"#include <pthread.h>#ifdef HAVE_SYS_TIME_H#include <sys/time.h>#else#include <time.h>#endif#include <fcntl.h>#include <errno.h>#include <unistd.h>#ifdef HAVE_STRINGS_H#include <strings.h>#endif#include <cstdlib>#include <cstring>#include <iostream>#include <sys/stat.h>namespace apache { namespace thrift { namespace transport {using boost::shared_ptr;using namespace std;using namespace apache::thrift::protocol;#ifndef HAVE_CLOCK_GETTIME/*** Fake clock_gettime for systems like darwin**/#define CLOCK_REALTIME 0static int clock_gettime(int clk_id /*ignored*/, struct timespec *tp) {struct timeval now;int rv = gettimeofday(&now, NULL);if (rv != 0) {return rv;}tp->tv_sec = now.tv_sec;tp->tv_nsec = now.tv_usec * 1000;return 0;}#endifTFileTransport::TFileTransport(string path, bool readOnly): readState_(), readBuff_(NULL), currentEvent_(NULL), readBuffSize_(DEFAULT_READ_BUFF_SIZE), readTimeout_(NO_TAIL_READ_TIMEOUT), chunkSize_(DEFAULT_CHUNK_SIZE), eventBufferSize_(DEFAULT_EVENT_BUFFER_SIZE), flushMaxUs_(DEFAULT_FLUSH_MAX_US), flushMaxBytes_(DEFAULT_FLUSH_MAX_BYTES), maxEventSize_(DEFAULT_MAX_EVENT_SIZE), maxCorruptedEvents_(DEFAULT_MAX_CORRUPTED_EVENTS), eofSleepTime_(DEFAULT_EOF_SLEEP_TIME_US), corruptedEventSleepTime_(DEFAULT_CORRUPTED_SLEEP_TIME_US), writerThreadId_(0), dequeueBuffer_(NULL), enqueueBuffer_(NULL), closing_(false), forceFlush_(false), filename_(path), fd_(0), bufferAndThreadInitialized_(false), offset_(0), lastBadChunk_(0), numCorruptedEventsInChunk_(0), readOnly_(readOnly){// initialize all the condition vars/mutexespthread_mutex_init(&mutex_, NULL);pthread_cond_init(¬Full_, NULL);pthread_cond_init(¬Empty_, NULL);pthread_cond_init(&flushed_, NULL);openLogFile();}void TFileTransport::resetOutputFile(int fd, string filename, int64_t offset) {filename_ = filename;offset_ = offset;// check if current file is still openif (fd_ > 0) {// flush any events in the queueflush();GlobalOutput.printf("error, current file (%s) not closed", filename_.c_str());if (-1 == ::close(fd_)) {int errno_copy = errno;GlobalOutput.perror("TFileTransport: resetOutputFile() ::close() ", errno_copy);throw TTransportException(TTransportException::UNKNOWN, "TFileTransport: error in file close", errno_copy);}}if (fd) {fd_ = fd;} else {// open file if the input fd is 0openLogFile();}}TFileTransport::~TFileTransport() {// flush the buffer if a writer thread is activeif (writerThreadId_ > 0) {// reduce the flush timeout so that closing is quickersetFlushMaxUs(300*1000);// flush output bufferflush();// set state to closingclosing_ = true;// TODO: make sure event queue is empty// currently only the write buffer is flushed// we dont actually wait until the queue is empty. This shouldn't be a big// deal in the common case because writing is quickpthread_join(writerThreadId_, NULL);writerThreadId_ = 0;}if (dequeueBuffer_) {delete dequeueBuffer_;dequeueBuffer_ = NULL;}if (enqueueBuffer_) {delete enqueueBuffer_;enqueueBuffer_ = NULL;}if (readBuff_) {delete[] readBuff_;readBuff_ = NULL;}if (currentEvent_) {delete currentEvent_;currentEvent_ = NULL;}// close logfileif (fd_ > 0) {if(-1 == ::close(fd_)) {GlobalOutput.perror("TFileTransport: ~TFileTransport() ::close() ", errno);}}}bool TFileTransport::initBufferAndWriteThread() {if (bufferAndThreadInitialized_) {T_ERROR("Trying to double-init TFileTransport");return false;}if (writerThreadId_ == 0) {if (pthread_create(&writerThreadId_, NULL, startWriterThread, (void *)this) != 0) {T_ERROR("Could not create writer thread");return false;}}dequeueBuffer_ = new TFileTransportBuffer(eventBufferSize_);enqueueBuffer_ = new TFileTransportBuffer(eventBufferSize_);bufferAndThreadInitialized_ = true;return true;}void TFileTransport::write(const uint8_t* buf, uint32_t len) {if (readOnly_) {throw TTransportException("TFileTransport: attempting to write to file opened readonly");}enqueueEvent(buf, len, false);}void TFileTransport::enqueueEvent(const uint8_t* buf, uint32_t eventLen, bool blockUntilFlush) {// can't enqueue more events if file is going to closeif (closing_) {return;}// make sure that event size is validif ( (maxEventSize_ > 0) && (eventLen > maxEventSize_) ) {T_ERROR("msg size is greater than max event size: %u > %u\n", eventLen, maxEventSize_);return;}if (eventLen == 0) {T_ERROR("cannot enqueue an empty event");return;}eventInfo* toEnqueue = new eventInfo();toEnqueue->eventBuff_ = (uint8_t *)std::malloc((sizeof(uint8_t) * eventLen) + 4);// first 4 bytes is the event lengthmemcpy(toEnqueue->eventBuff_, (void*)(&eventLen), 4);// actual event contentsmemcpy(toEnqueue->eventBuff_ + 4, buf, eventLen);toEnqueue->eventSize_ = eventLen + 4;// lock mutexpthread_mutex_lock(&mutex_);// make sure that enqueue buffer is initialized and writer thread is runningif (!bufferAndThreadInitialized_) {if (!initBufferAndWriteThread()) {delete toEnqueue;pthread_mutex_unlock(&mutex_);return;}}// Can't enqueue while buffer is fullwhile (enqueueBuffer_->isFull()) {pthread_cond_wait(¬Full_, &mutex_);}// add to the bufferif (!enqueueBuffer_->addEvent(toEnqueue)) {delete toEnqueue;pthread_mutex_unlock(&mutex_);return;}// signal anybody who's waiting for the buffer to be non-emptypthread_cond_signal(¬Empty_);if (blockUntilFlush) {pthread_cond_wait(&flushed_, &mutex_);}// this really should be a loop where it makes sure it got flushed// because condition variables can get triggered by the os for no reason// it is probably a non-factor for the time beingpthread_mutex_unlock(&mutex_);}bool TFileTransport::swapEventBuffers(struct timespec* deadline) {pthread_mutex_lock(&mutex_);if (deadline != NULL) {// if we were handed a deadline time struct, do a timed waitpthread_cond_timedwait(¬Empty_, &mutex_, deadline);} else {// just wait until the buffer gets an itempthread_cond_wait(¬Empty_, &mutex_);}bool swapped = false;// could be empty if we timed outif (!enqueueBuffer_->isEmpty()) {TFileTransportBuffer *temp = enqueueBuffer_;enqueueBuffer_ = dequeueBuffer_;dequeueBuffer_ = temp;swapped = true;}// unlock the mutex and signal if requiredpthread_mutex_unlock(&mutex_);if (swapped) {pthread_cond_signal(¬Full_);}return swapped;}void TFileTransport::writerThread() {// open file if it is not openif(!fd_) {openLogFile();}// set the offset to the correct value (EOF)try {seekToEnd();} catch (TException &te) {}// throw away any partial eventsoffset_ += readState_.lastDispatchPtr_;ftruncate(fd_, offset_);readState_.resetAllValues();// Figure out the next time by which a flush must take placestruct timespec ts_next_flush;getNextFlushTime(&ts_next_flush);uint32_t unflushed = 0;while(1) {// this will only be true when the destructor is being invokedif(closing_) {// empty out both the buffersif (enqueueBuffer_->isEmpty() && dequeueBuffer_->isEmpty()) {if (-1 == ::close(fd_)) {int errno_copy = errno;GlobalOutput.perror("TFileTransport: writerThread() ::close() ", errno_copy);throw TTransportException(TTransportException::UNKNOWN, "TFileTransport: error in file close", errno_copy);}// just be safe and sync to diskfsync(fd_);fd_ = 0;pthread_exit(NULL);return;}}if (swapEventBuffers(&ts_next_flush)) {eventInfo* outEvent;while (NULL != (outEvent = dequeueBuffer_->getNext())) {if (!outEvent) {T_DEBUG_L(1, "Got an empty event");return;}// sanity check on eventif ((maxEventSize_ > 0) && (outEvent->eventSize_ > maxEventSize_)) {T_ERROR("msg size is greater than max event size: %u > %u\n", outEvent->eventSize_, maxEventSize_);continue;}// If chunking is required, then make sure that msg does not cross chunk boundaryif ((outEvent->eventSize_ > 0) && (chunkSize_ != 0)) {// event size must be less than chunk sizeif(outEvent->eventSize_ > chunkSize_) {T_ERROR("TFileTransport: event size(%u) is greater than chunk size(%u): skipping event",outEvent->eventSize_, chunkSize_);continue;}int64_t chunk1 = offset_/chunkSize_;int64_t chunk2 = (offset_ + outEvent->eventSize_ - 1)/chunkSize_;// if adding this event will cross a chunk boundary, pad the chunk with zerosif (chunk1 != chunk2) {// refetch the offset to keep in syncoffset_ = lseek(fd_, 0, SEEK_CUR);int32_t padding = (int32_t)((offset_/chunkSize_ + 1)*chunkSize_ - offset_);uint8_t zeros[padding];bzero(zeros, padding);if (-1 == ::write(fd_, zeros, padding)) {int errno_copy = errno;GlobalOutput.perror("TFileTransport: writerThread() error while padding zeros ", errno_copy);throw TTransportException(TTransportException::UNKNOWN, "TFileTransport: error while padding zeros", errno_copy);}unflushed += padding;offset_ += padding;}}// write the dequeued event to the fileif (outEvent->eventSize_ > 0) {if (-1 == ::write(fd_, outEvent->eventBuff_, outEvent->eventSize_)) {int errno_copy = errno;GlobalOutput.perror("TFileTransport: error while writing event ", errno_copy);throw TTransportException(TTransportException::UNKNOWN, "TFileTransport: error while writing event", errno_copy);}unflushed += outEvent->eventSize_;offset_ += outEvent->eventSize_;}}dequeueBuffer_->reset();}bool flushTimeElapsed = false;struct timespec current_time;clock_gettime(CLOCK_REALTIME, ¤t_time);if (current_time.tv_sec > ts_next_flush.tv_sec ||(current_time.tv_sec == ts_next_flush.tv_sec && current_time.tv_nsec > ts_next_flush.tv_nsec)) {flushTimeElapsed = true;getNextFlushTime(&ts_next_flush);}// couple of cases from which a flush could be triggeredif ((flushTimeElapsed && unflushed > 0) ||unflushed > flushMaxBytes_ ||forceFlush_) {// sync (force flush) file to diskfsync(fd_);unflushed = 0;// notify anybody waiting for flush completionforceFlush_ = false;pthread_cond_broadcast(&flushed_);}}}void TFileTransport::flush() {// file must be open for writing for any flushing to take placeif (writerThreadId_ <= 0) {return;}// wait for flush to take placepthread_mutex_lock(&mutex_);forceFlush_ = true;while (forceFlush_) {pthread_cond_wait(&flushed_, &mutex_);}pthread_mutex_unlock(&mutex_);}uint32_t TFileTransport::readAll(uint8_t* buf, uint32_t len) {uint32_t have = 0;uint32_t get = 0;while (have < len) {get = read(buf+have, len-have);if (get <= 0) {throw TEOFException();}have += get;}return have;}uint32_t TFileTransport::read(uint8_t* buf, uint32_t len) {// check if there an event is ready to be readif (!currentEvent_) {currentEvent_ = readEvent();}// did not manage to read an event from the file. This could have happened// if the timeout expired or there was some other errorif (!currentEvent_) {return 0;}// read as much of the current event as possibleint32_t remaining = currentEvent_->eventSize_ - currentEvent_->eventBuffPos_;if (remaining <= (int32_t)len) {// copy over anything thats remainingif (remaining > 0) {memcpy(buf,currentEvent_->eventBuff_ + currentEvent_->eventBuffPos_,remaining);}delete(currentEvent_);currentEvent_ = NULL;return remaining;}// read as much as possiblememcpy(buf, currentEvent_->eventBuff_ + currentEvent_->eventBuffPos_, len);currentEvent_->eventBuffPos_ += len;return len;}eventInfo* TFileTransport::readEvent() {int readTries = 0;if (!readBuff_) {readBuff_ = new uint8_t[readBuffSize_];}while (1) {// read from the file if read buffer is exhaustedif (readState_.bufferPtr_ == readState_.bufferLen_) {// advance the offset pointeroffset_ += readState_.bufferLen_;readState_.bufferLen_ = ::read(fd_, readBuff_, readBuffSize_);// if (readState_.bufferLen_) {// T_DEBUG_L(1, "Amount read: %u (offset: %lu)", readState_.bufferLen_, offset_);// }readState_.bufferPtr_ = 0;readState_.lastDispatchPtr_ = 0;// read errorif (readState_.bufferLen_ == -1) {readState_.resetAllValues();GlobalOutput("TFileTransport: error while reading from file");throw TTransportException("TFileTransport: error while reading from file");} else if (readState_.bufferLen_ == 0) { // EOF// wait indefinitely if there is no timeoutif (readTimeout_ == TAIL_READ_TIMEOUT) {usleep(eofSleepTime_);continue;} else if (readTimeout_ == NO_TAIL_READ_TIMEOUT) {// reset statereadState_.resetState(0);return NULL;} else if (readTimeout_ > 0) {// timeout already expired onceif (readTries > 0) {readState_.resetState(0);return NULL;} else {usleep(readTimeout_ * 1000);readTries++;continue;}}}}readTries = 0;// attempt to read an event from the bufferwhile(readState_.bufferPtr_ < readState_.bufferLen_) {if (readState_.readingSize_) {if(readState_.eventSizeBuffPos_ == 0) {if ( (offset_ + readState_.bufferPtr_)/chunkSize_ !=((offset_ + readState_.bufferPtr_ + 3)/chunkSize_)) {// skip one byte towards chunk boundary// T_DEBUG_L(1, "Skipping a byte");readState_.bufferPtr_++;continue;}}readState_.eventSizeBuff_[readState_.eventSizeBuffPos_++] =readBuff_[readState_.bufferPtr_++];if (readState_.eventSizeBuffPos_ == 4) {// 0 length event indicates paddingif (*((uint32_t *)(readState_.eventSizeBuff_)) == 0) {// T_DEBUG_L(1, "Got padding");readState_.resetState(readState_.lastDispatchPtr_);continue;}// got a valid eventreadState_.readingSize_ = false;if (readState_.event_) {delete(readState_.event_);}readState_.event_ = new eventInfo();readState_.event_->eventSize_ = *((uint32_t *)(readState_.eventSizeBuff_));// check if the event is corrupted and perform recovery if requiredif (isEventCorrupted()) {performRecovery();// start from the topbreak;}}} else {if (!readState_.event_->eventBuff_) {readState_.event_->eventBuff_ = new uint8_t[readState_.event_->eventSize_];readState_.event_->eventBuffPos_ = 0;}// take either the entire event or the remaining bytes in the bufferint reclaimBuffer = min((uint32_t)(readState_.bufferLen_ - readState_.bufferPtr_),readState_.event_->eventSize_ - readState_.event_->eventBuffPos_);// copy data from read buffer into event buffermemcpy(readState_.event_->eventBuff_ + readState_.event_->eventBuffPos_,readBuff_ + readState_.bufferPtr_,reclaimBuffer);// increment position ptrsreadState_.event_->eventBuffPos_ += reclaimBuffer;readState_.bufferPtr_ += reclaimBuffer;// check if the event has been read in fullif (readState_.event_->eventBuffPos_ == readState_.event_->eventSize_) {// set the completed event to the current eventeventInfo* completeEvent = readState_.event_;completeEvent->eventBuffPos_ = 0;readState_.event_ = NULL;readState_.resetState(readState_.bufferPtr_);// exit criteriareturn completeEvent;}}}}}bool TFileTransport::isEventCorrupted() {// an error is triggered if:if ( (maxEventSize_ > 0) && (readState_.event_->eventSize_ > maxEventSize_)) {// 1. Event size is larger than user-speficied max-event sizeT_ERROR("Read corrupt event. Event size(%u) greater than max event size (%u)",readState_.event_->eventSize_, maxEventSize_);return true;} else if (readState_.event_->eventSize_ > chunkSize_) {// 2. Event size is larger than chunk sizeT_ERROR("Read corrupt event. Event size(%u) greater than chunk size (%u)",readState_.event_->eventSize_, chunkSize_);return true;} else if( ((offset_ + readState_.bufferPtr_ - 4)/chunkSize_) !=((offset_ + readState_.bufferPtr_ + readState_.event_->eventSize_ - 1)/chunkSize_) ) {// 3. size indicates that event crosses chunk boundaryT_ERROR("Read corrupt event. Event crosses chunk boundary. Event size:%u Offset:%ld",readState_.event_->eventSize_, offset_ + readState_.bufferPtr_ + 4);return true;}return false;}void TFileTransport::performRecovery() {// perform some kickass recoveryuint32_t curChunk = getCurChunk();if (lastBadChunk_ == curChunk) {numCorruptedEventsInChunk_++;} else {lastBadChunk_ = curChunk;numCorruptedEventsInChunk_ = 1;}if (numCorruptedEventsInChunk_ < maxCorruptedEvents_) {// maybe there was an error in reading the file from disk// seek to the beginning of chunk and try againseekToChunk(curChunk);} else {// just skip ahead to the next chunk if we not already at the last chunkif (curChunk != (getNumChunks() - 1)) {seekToChunk(curChunk + 1);} else if (readTimeout_ == TAIL_READ_TIMEOUT) {// if tailing the file, wait until there is enough data to start// the next chunkwhile(curChunk == (getNumChunks() - 1)) {usleep(DEFAULT_CORRUPTED_SLEEP_TIME_US);}seekToChunk(curChunk + 1);} else {// pretty hosed at this stage, rewind the file back to the last successful// point and punt on the errorreadState_.resetState(readState_.lastDispatchPtr_);currentEvent_ = NULL;char errorMsg[1024];sprintf(errorMsg, "TFileTransport: log file corrupted at offset: %lu",offset_ + readState_.lastDispatchPtr_);GlobalOutput(errorMsg);throw TTransportException(errorMsg);}}}void TFileTransport::seekToChunk(int32_t chunk) {if (fd_ <= 0) {throw TTransportException("File not open");}int32_t numChunks = getNumChunks();// file is empty, seeking to chunk is pointlessif (numChunks == 0) {return;}// negative indicates reverse seek (from the end)if (chunk < 0) {chunk += numChunks;}// too large a value for reverse seek, just seek to beginningif (chunk < 0) {T_DEBUG("Incorrect value for reverse seek. Seeking to beginning...", chunk)chunk = 0;}// cannot seek past EOFbool seekToEnd = false;uint32_t minEndOffset = 0;if (chunk >= numChunks) {T_DEBUG("Trying to seek past EOF. Seeking to EOF instead...");seekToEnd = true;chunk = numChunks - 1;// this is the min offset to process events tillminEndOffset = lseek(fd_, 0, SEEK_END);}off_t newOffset = off_t(chunk) * chunkSize_;offset_ = lseek(fd_, newOffset, SEEK_SET);readState_.resetAllValues();currentEvent_ = NULL;if (offset_ == -1) {GlobalOutput("TFileTransport: lseek error in seekToChunk");throw TTransportException("TFileTransport: lseek error in seekToChunk");}// seek to EOF if user wanted to go to last chunkif (seekToEnd) {uint32_t oldReadTimeout = getReadTimeout();setReadTimeout(NO_TAIL_READ_TIMEOUT);// keep on reading unti the last event at point of seekChunk callwhile (readEvent() && ((offset_ + readState_.bufferPtr_) < minEndOffset)) {};setReadTimeout(oldReadTimeout);}}void TFileTransport::seekToEnd() {seekToChunk(getNumChunks());}uint32_t TFileTransport::getNumChunks() {if (fd_ <= 0) {return 0;}struct stat f_info;int rv = fstat(fd_, &f_info);if (rv < 0) {int errno_copy = errno;throw TTransportException(TTransportException::UNKNOWN,"TFileTransport::getNumChunks() (fstat)",errno_copy);}if (f_info.st_size > 0) {return ((f_info.st_size)/chunkSize_) + 1;}// empty file has no chunksreturn 0;}uint32_t TFileTransport::getCurChunk() {return offset_/chunkSize_;}// Utility Functionsvoid TFileTransport::openLogFile() {mode_t mode = readOnly_ ? S_IRUSR | S_IRGRP | S_IROTH : S_IRUSR | S_IWUSR| S_IRGRP | S_IROTH;int flags = readOnly_ ? O_RDONLY : O_RDWR | O_CREAT | O_APPEND;fd_ = ::open(filename_.c_str(), flags, mode);offset_ = 0;// make sure open call was successfulif(fd_ == -1) {int errno_copy = errno;GlobalOutput.perror("TFileTransport: openLogFile() ::open() file: " + filename_, errno_copy);throw TTransportException(TTransportException::NOT_OPEN, filename_, errno_copy);}}void TFileTransport::getNextFlushTime(struct timespec* ts_next_flush) {clock_gettime(CLOCK_REALTIME, ts_next_flush);ts_next_flush->tv_nsec += (flushMaxUs_ % 1000000) * 1000;if (ts_next_flush->tv_nsec > 1000000000) {ts_next_flush->tv_nsec -= 1000000000;ts_next_flush->tv_sec += 1;}ts_next_flush->tv_sec += flushMaxUs_ / 1000000;}TFileTransportBuffer::TFileTransportBuffer(uint32_t size): bufferMode_(WRITE), writePoint_(0), readPoint_(0), size_(size){buffer_ = new eventInfo*[size];}TFileTransportBuffer::~TFileTransportBuffer() {if (buffer_) {for (uint32_t i = 0; i < writePoint_; i++) {delete buffer_[i];}delete[] buffer_;buffer_ = NULL;}}bool TFileTransportBuffer::addEvent(eventInfo *event) {if (bufferMode_ == READ) {GlobalOutput("Trying to write to a buffer in read mode");}if (writePoint_ < size_) {buffer_[writePoint_++] = event;return true;} else {// buffer is fullreturn false;}}eventInfo* TFileTransportBuffer::getNext() {if (bufferMode_ == WRITE) {bufferMode_ = READ;}if (readPoint_ < writePoint_) {return buffer_[readPoint_++];} else {// no more entriesreturn NULL;}}void TFileTransportBuffer::reset() {if (bufferMode_ == WRITE || writePoint_ > readPoint_) {T_DEBUG("Resetting a buffer with unread entries");}// Clean up the old entriesfor (uint32_t i = 0; i < writePoint_; i++) {delete buffer_[i];}bufferMode_ = WRITE;writePoint_ = 0;readPoint_ = 0;}bool TFileTransportBuffer::isFull() {return writePoint_ == size_;}bool TFileTransportBuffer::isEmpty() {return writePoint_ == 0;}TFileProcessor::TFileProcessor(shared_ptr<TProcessor> processor,shared_ptr<TProtocolFactory> protocolFactory,shared_ptr<TFileReaderTransport> inputTransport):processor_(processor),inputProtocolFactory_(protocolFactory),outputProtocolFactory_(protocolFactory),inputTransport_(inputTransport) {// default the output transport to a null transport (common case)outputTransport_ = shared_ptr<TNullTransport>(new TNullTransport());}TFileProcessor::TFileProcessor(shared_ptr<TProcessor> processor,shared_ptr<TProtocolFactory> inputProtocolFactory,shared_ptr<TProtocolFactory> outputProtocolFactory,shared_ptr<TFileReaderTransport> inputTransport):processor_(processor),inputProtocolFactory_(inputProtocolFactory),outputProtocolFactory_(outputProtocolFactory),inputTransport_(inputTransport) {// default the output transport to a null transport (common case)outputTransport_ = shared_ptr<TNullTransport>(new TNullTransport());}TFileProcessor::TFileProcessor(shared_ptr<TProcessor> processor,shared_ptr<TProtocolFactory> protocolFactory,shared_ptr<TFileReaderTransport> inputTransport,shared_ptr<TTransport> outputTransport):processor_(processor),inputProtocolFactory_(protocolFactory),outputProtocolFactory_(protocolFactory),inputTransport_(inputTransport),outputTransport_(outputTransport) {};void TFileProcessor::process(uint32_t numEvents, bool tail) {shared_ptr<TProtocol> inputProtocol = inputProtocolFactory_->getProtocol(inputTransport_);shared_ptr<TProtocol> outputProtocol = outputProtocolFactory_->getProtocol(outputTransport_);// set the read timeout to 0 if tailing is requiredint32_t oldReadTimeout = inputTransport_->getReadTimeout();if (tail) {// save old read timeout so it can be restoredinputTransport_->setReadTimeout(TFileTransport::TAIL_READ_TIMEOUT);}uint32_t numProcessed = 0;while(1) {// bad form to use exceptions for flow control but there is really// no other way around ittry {processor_->process(inputProtocol, outputProtocol);numProcessed++;if ( (numEvents > 0) && (numProcessed == numEvents)) {return;}} catch (TEOFException& teof) {if (!tail) {break;}} catch (TException &te) {cerr << te.what() << endl;break;}}// restore old read timeoutif (tail) {inputTransport_->setReadTimeout(oldReadTimeout);}}void TFileProcessor::processChunk() {shared_ptr<TProtocol> inputProtocol = inputProtocolFactory_->getProtocol(inputTransport_);shared_ptr<TProtocol> outputProtocol = outputProtocolFactory_->getProtocol(outputTransport_);uint32_t curChunk = inputTransport_->getCurChunk();while(1) {// bad form to use exceptions for flow control but there is really// no other way around ittry {processor_->process(inputProtocol, outputProtocol);if (curChunk != inputTransport_->getCurChunk()) {break;}} catch (TEOFException& teof) {break;} catch (TException &te) {cerr << te.what() << endl;break;}}}}}} // apache::thrift::transport