Rev 30 | Blame | Compare with Previous | Last modification | View Log | RSS feed
/** Licensed to the Apache Software Foundation (ASF) under one* or more contributor license agreements. See the NOTICE file* distributed with this work for additional information* regarding copyright ownership. The ASF licenses this file* to you under the Apache License, Version 2.0 (the* "License"); you may not use this file except in compliance* with the License. You may obtain a copy of the License at** http://www.apache.org/licenses/LICENSE-2.0** Unless required by applicable law or agreed to in writing,* software distributed under the License is distributed on an* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY* KIND, either express or implied. See the License for the* specific language governing permissions and limitations* under the License.*/#ifndef _THRIFT_TRANSPORT_TFILETRANSPORT_H_#define _THRIFT_TRANSPORT_TFILETRANSPORT_H_ 1#include "TTransport.h"#include "Thrift.h"#include "TProcessor.h"#include <string>#include <stdio.h>#include <pthread.h>#include <boost/shared_ptr.hpp>namespace apache { namespace thrift { namespace transport {using apache::thrift::TProcessor;using apache::thrift::protocol::TProtocolFactory;// Data pertaining to a single eventtypedef struct eventInfo {uint8_t* eventBuff_;uint32_t eventSize_;uint32_t eventBuffPos_;eventInfo():eventBuff_(NULL), eventSize_(0), eventBuffPos_(0){};~eventInfo() {if (eventBuff_) {delete[] eventBuff_;}}} eventInfo;// information about current read statetypedef struct readState {eventInfo* event_;// keep track of event sizeuint8_t eventSizeBuff_[4];uint8_t eventSizeBuffPos_;bool readingSize_;// read buffer variablesint32_t bufferPtr_;int32_t bufferLen_;// last successful dispatch pointint32_t lastDispatchPtr_;void resetState(uint32_t lastDispatchPtr) {readingSize_ = true;eventSizeBuffPos_ = 0;lastDispatchPtr_ = lastDispatchPtr;}void resetAllValues() {resetState(0);bufferPtr_ = 0;bufferLen_ = 0;if (event_) {delete(event_);}event_ = 0;}readState() {event_ = 0;resetAllValues();}~readState() {if (event_) {delete(event_);}}} readState;/*** TFileTransportBuffer - buffer class used by TFileTransport for queueing up events* to be written to disk. Should be used in the following way:* 1) Buffer created* 2) Buffer written to (addEvent)* 3) Buffer read from (getNext)* 4) Buffer reset (reset)* 5) Go back to 2, or destroy buffer** The buffer should never be written to after it is read from, unless it is reset first.* Note: The above rules are enforced mainly for debugging its sole client TFileTransport* which uses the buffer in this way.**/class TFileTransportBuffer {public:TFileTransportBuffer(uint32_t size);~TFileTransportBuffer();bool addEvent(eventInfo *event);eventInfo* getNext();void reset();bool isFull();bool isEmpty();private:TFileTransportBuffer(); // should not be usedenum mode {WRITE,READ};mode bufferMode_;uint32_t writePoint_;uint32_t readPoint_;uint32_t size_;eventInfo** buffer_;};/*** Abstract interface for transports used to read files*/class TFileReaderTransport : virtual public TTransport {public:virtual int32_t getReadTimeout() = 0;virtual void setReadTimeout(int32_t readTimeout) = 0;virtual uint32_t getNumChunks() = 0;virtual uint32_t getCurChunk() = 0;virtual void seekToChunk(int32_t chunk) = 0;virtual void seekToEnd() = 0;};/*** Abstract interface for transports used to write files*/class TFileWriterTransport : virtual public TTransport {public:virtual uint32_t getChunkSize() = 0;virtual void setChunkSize(uint32_t chunkSize) = 0;};/*** File implementation of a transport. Reads and writes are done to a* file on disk.**/class TFileTransport : public TFileReaderTransport,public TFileWriterTransport {public:TFileTransport(std::string path, bool readOnly=false);~TFileTransport();// TODO: what is the correct behaviour for this?// the log file is generally always openbool isOpen() {return true;}void write(const uint8_t* buf, uint32_t len);void flush();uint32_t readAll(uint8_t* buf, uint32_t len);uint32_t read(uint8_t* buf, uint32_t len);// log-file specific functionsvoid seekToChunk(int32_t chunk);void seekToEnd();uint32_t getNumChunks();uint32_t getCurChunk();// for changing the output filevoid resetOutputFile(int fd, std::string filename, int64_t offset);// Setter/Getter functions for user-controllable optionsvoid setReadBuffSize(uint32_t readBuffSize) {if (readBuffSize) {readBuffSize_ = readBuffSize;}}uint32_t getReadBuffSize() {return readBuffSize_;}static const int32_t TAIL_READ_TIMEOUT = -1;static const int32_t NO_TAIL_READ_TIMEOUT = 0;void setReadTimeout(int32_t readTimeout) {readTimeout_ = readTimeout;}int32_t getReadTimeout() {return readTimeout_;}void setChunkSize(uint32_t chunkSize) {if (chunkSize) {chunkSize_ = chunkSize;}}uint32_t getChunkSize() {return chunkSize_;}void setEventBufferSize(uint32_t bufferSize) {if (bufferAndThreadInitialized_) {GlobalOutput("Cannot change the buffer size after writer thread started");return;}eventBufferSize_ = bufferSize;}uint32_t getEventBufferSize() {return eventBufferSize_;}void setFlushMaxUs(uint32_t flushMaxUs) {if (flushMaxUs) {flushMaxUs_ = flushMaxUs;}}uint32_t getFlushMaxUs() {return flushMaxUs_;}void setFlushMaxBytes(uint32_t flushMaxBytes) {if (flushMaxBytes) {flushMaxBytes_ = flushMaxBytes;}}uint32_t getFlushMaxBytes() {return flushMaxBytes_;}void setMaxEventSize(uint32_t maxEventSize) {maxEventSize_ = maxEventSize;}uint32_t getMaxEventSize() {return maxEventSize_;}void setMaxCorruptedEvents(uint32_t maxCorruptedEvents) {maxCorruptedEvents_ = maxCorruptedEvents;}uint32_t getMaxCorruptedEvents() {return maxCorruptedEvents_;}void setEofSleepTimeUs(uint32_t eofSleepTime) {if (eofSleepTime) {eofSleepTime_ = eofSleepTime;}}uint32_t getEofSleepTimeUs() {return eofSleepTime_;}private:// helper functions for writing to a filevoid enqueueEvent(const uint8_t* buf, uint32_t eventLen, bool blockUntilFlush);bool swapEventBuffers(struct timespec* deadline);bool initBufferAndWriteThread();// control for writer threadstatic void* startWriterThread(void* ptr) {(((TFileTransport*)ptr)->writerThread());return 0;}void writerThread();// helper functions for reading from a fileeventInfo* readEvent();// event corruption-related functionsbool isEventCorrupted();void performRecovery();// Utility functionsvoid openLogFile();void getNextFlushTime(struct timespec* ts_next_flush);// Class variablesreadState readState_;uint8_t* readBuff_;eventInfo* currentEvent_;uint32_t readBuffSize_;static const uint32_t DEFAULT_READ_BUFF_SIZE = 1 * 1024 * 1024;int32_t readTimeout_;static const int32_t DEFAULT_READ_TIMEOUT_MS = 200;// size of chunks that file will be split up intouint32_t chunkSize_;static const uint32_t DEFAULT_CHUNK_SIZE = 16 * 1024 * 1024;// size of event buffersuint32_t eventBufferSize_;static const uint32_t DEFAULT_EVENT_BUFFER_SIZE = 10000;// max number of microseconds that can pass without flushinguint32_t flushMaxUs_;static const uint32_t DEFAULT_FLUSH_MAX_US = 3000000;// max number of bytes that can be written without flushinguint32_t flushMaxBytes_;static const uint32_t DEFAULT_FLUSH_MAX_BYTES = 1000 * 1024;// max event sizeuint32_t maxEventSize_;static const uint32_t DEFAULT_MAX_EVENT_SIZE = 0;// max number of corrupted events per chunkuint32_t maxCorruptedEvents_;static const uint32_t DEFAULT_MAX_CORRUPTED_EVENTS = 0;// sleep duration when EOF is hituint32_t eofSleepTime_;static const uint32_t DEFAULT_EOF_SLEEP_TIME_US = 500 * 1000;// sleep duration when a corrupted event is encountereduint32_t corruptedEventSleepTime_;static const uint32_t DEFAULT_CORRUPTED_SLEEP_TIME_US = 1 * 1000 * 1000;// writer thread idpthread_t writerThreadId_;// buffers to hold data before it is flushed. Each element of the buffer stores a msg that// needs to be written to the file. The buffers are swapped by the writer thread.TFileTransportBuffer *dequeueBuffer_;TFileTransportBuffer *enqueueBuffer_;// conditions used to block when the buffer is full or emptypthread_cond_t notFull_, notEmpty_;volatile bool closing_;// To keep track of whether the buffer has been flushedpthread_cond_t flushed_;volatile bool forceFlush_;// Mutex that is grabbed when enqueueing and swapping the read/write bufferspthread_mutex_t mutex_;// File informationstd::string filename_;int fd_;// Whether the writer thread and buffers have been initializedbool bufferAndThreadInitialized_;// Offset within the fileoff_t offset_;// event corruption informationuint32_t lastBadChunk_;uint32_t numCorruptedEventsInChunk_;bool readOnly_;};// Exception thrown when EOF is hitclass TEOFException : public TTransportException {public:TEOFException():TTransportException(TTransportException::END_OF_FILE) {};};// wrapper class to process events from a file containing thrift eventsclass TFileProcessor {public:/*** Constructor that defaults output transport to null transport** @param processor processes log-file events* @param protocolFactory protocol factory* @param inputTransport file transport*/TFileProcessor(boost::shared_ptr<TProcessor> processor,boost::shared_ptr<TProtocolFactory> protocolFactory,boost::shared_ptr<TFileReaderTransport> inputTransport);TFileProcessor(boost::shared_ptr<TProcessor> processor,boost::shared_ptr<TProtocolFactory> inputProtocolFactory,boost::shared_ptr<TProtocolFactory> outputProtocolFactory,boost::shared_ptr<TFileReaderTransport> inputTransport);/*** Constructor** @param processor processes log-file events* @param protocolFactory protocol factory* @param inputTransport input file transport* @param output output transport*/TFileProcessor(boost::shared_ptr<TProcessor> processor,boost::shared_ptr<TProtocolFactory> protocolFactory,boost::shared_ptr<TFileReaderTransport> inputTransport,boost::shared_ptr<TTransport> outputTransport);/*** processes events from the file** @param numEvents number of events to process (0 for unlimited)* @param tail tails the file if true*/void process(uint32_t numEvents, bool tail);/*** process events until the end of the chunk**/void processChunk();private:boost::shared_ptr<TProcessor> processor_;boost::shared_ptr<TProtocolFactory> inputProtocolFactory_;boost::shared_ptr<TProtocolFactory> outputProtocolFactory_;boost::shared_ptr<TFileReaderTransport> inputTransport_;boost::shared_ptr<TTransport> outputTransport_;};}}} // apache::thrift::transport#endif // _THRIFT_TRANSPORT_TFILETRANSPORT_H_