Rev 301 | Blame | Compare with Previous | Last modification | View Log | RSS feed
// Copyright (c) 2007-2008 Facebook//// Licensed under the Apache License, Version 2.0 (the "License");// you may not use this file except in compliance with the License.// You may obtain a copy of the License at//// http://www.apache.org/licenses/LICENSE-2.0//// Unless required by applicable law or agreed to in writing, software// distributed under the License is distributed on an "AS IS" BASIS,// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.// See the License for the specific language governing permissions and// limitations under the License.//// See accompanying file LICENSE or visit the Scribe site at:// http://developers.facebook.com/scribe///// @author Bobby Johnson// @author James Wang// @author Jason Sobel// @author Alex Moskalyuk// @author Avinash Lakshman// @author Anthony Giardullo// @author Jan Oravec#ifndef SCRIBE_STORE_H#define SCRIBE_STORE_H#include "common.h" // includes std libs, thrift, and stl typedefs#include "conf.h"#include "file.h"#include "conn_pool.h"/* defines used by the store class */enum roll_period_t {ROLL_NEVER,ROLL_HOURLY,ROLL_DAILY,ROLL_OTHER};/** Abstract class to define the interface for a store* and implement some basic functionality.*/class Store {public:// Creates an object of the appropriate subclass.static boost::shared_ptr<Store>createStore(const std::string& type, const std::string& category,bool readable = false, bool multi_category = false);Store(const std::string& category, const std::string &type,bool multi_category = false);virtual ~Store();virtual boost::shared_ptr<Store> copy(const std::string &category) = 0;virtual bool open() = 0;virtual bool isOpen() = 0;virtual void configure(pStoreConf configuration) = 0;virtual void close() = 0;// Attempts to store messages and returns true if successful.// On failure, returns false and messages contains any un-processed messagesvirtual bool handleMessages(boost::shared_ptr<logentry_vector_t> messages) = 0;virtual void periodicCheck() {}virtual void flush() = 0;virtual std::string getStatus();// following methods must be overidden to make a store readablevirtual bool readOldest(/*out*/ boost::shared_ptr<logentry_vector_t> messages,struct tm* now);virtual void deleteOldest(struct tm* now);virtual bool replaceOldest(boost::shared_ptr<logentry_vector_t> messages,struct tm* now);virtual bool empty(struct tm* now);// don't need to overridevirtual const std::string& getType();protected:virtual void setStatus(const std::string& new_status);std::string status;std::string categoryHandled;bool multiCategory; // Whether multiple categories are handledstd::string storeType;// Don't ever take this lock for multiple stores at the same timepthread_mutex_t statusMutex;private:// disallow copy, assignment, and empty constructionStore(Store& rhs);Store& operator=(Store& rhs);};/** Abstract class that serves as a base for file-based stores.* This class has logic for naming files and deciding when to rotate.*/class FileStoreBase : public Store {public:FileStoreBase(const std::string& category, const std::string &type,bool multi_category);~FileStoreBase();virtual void copyCommon(const FileStoreBase *base);bool open();void configure(pStoreConf configuration);void periodicCheck();protected:// We need to pass arguments to open when called internally.// The external open function just calls this with default args.virtual bool openInternal(bool incrementFilename, struct tm* current_time) = 0;virtual void rotateFile(time_t currentTime = 0);// appends information about the current file to a log file in the same directoryvirtual void printStats();// Returns the number of bytes to pad to align to the specified block sizeunsigned long bytesToPad(unsigned long next_message_length,unsigned long current_file_size,unsigned long chunk_size);// A full filename includes an absolute path and a sequence number suffix.std::string makeBaseFilename(struct tm* creation_time);std::string makeFullFilename(int suffix, struct tm* creation_time,bool use_full_path = true);std::string makeBaseSymlink();std::string makeFullSymlink();int findOldestFile(const std::string& base_filename);int findNewestFile(const std::string& base_filename);int getFileSuffix(const std::string& filename,const std::string& base_filename);void setHostNameSubDir();// Configurationstd::string baseFilePath;std::string subDirectory;std::string filePath;std::string baseFileName;std::string baseSymlinkName;unsigned long maxSize;unsigned long maxWriteSize;roll_period_t rollPeriod;time_t rollPeriodLength;unsigned long rollHour;unsigned long rollMinute;std::string fsType;unsigned long chunkSize;bool writeMeta;bool writeCategory;bool createSymlink;bool writeStats;// Stateunsigned long currentSize;time_t lastRollTime; // either hour, day or time since epoch,// depending on rollPeriodstd::string currentFilename; // this isn't used to choose the next file name,// we just need it for reportingunsigned long eventsWritten; // This is how many events this process has// written to the currently open file. It is NOT// necessarily the number of lines in the fileprivate:// disallow copy, assignment, and empty constructionFileStoreBase(FileStoreBase& rhs);FileStoreBase& operator=(FileStoreBase& rhs);};/** This file-based store uses an instance of a FileInterface class that* handles the details of interfacing with the filesystem. (see file.h)*/class FileStore : public FileStoreBase {public:FileStore(const std::string& category, bool multi_category,bool is_buffer_file = false);~FileStore();boost::shared_ptr<Store> copy(const std::string &category);bool handleMessages(boost::shared_ptr<logentry_vector_t> messages);bool isOpen();void configure(pStoreConf configuration);void close();void flush();// Each read does its own open and close and gets the whole file.// This is separate from the write file, and not really a consistent interface.bool readOldest(/*out*/ boost::shared_ptr<logentry_vector_t> messages,struct tm* now);virtual bool replaceOldest(boost::shared_ptr<logentry_vector_t> messages,struct tm* now);void deleteOldest(struct tm* now);bool empty(struct tm* now);protected:// Implement FileStoreBase virtual functionbool openInternal(bool incrementFilename, struct tm* current_time);bool writeMessages(boost::shared_ptr<logentry_vector_t> messages,boost::shared_ptr<FileInterface> write_file =boost::shared_ptr<FileInterface>());bool isBufferFile;bool addNewlines;// Stateboost::shared_ptr<FileInterface> writeFile;private:// disallow copy, assignment, and empty constructionFileStore(FileStore& rhs);FileStore& operator=(FileStore& rhs);};/** This file-based store relies on thrift's TFileTransport to do the writing*/class ThriftFileStore : public FileStoreBase {public:ThriftFileStore(const std::string& category, bool multi_category);~ThriftFileStore();boost::shared_ptr<Store> copy(const std::string &category);bool handleMessages(boost::shared_ptr<logentry_vector_t> messages);bool open();bool isOpen();void configure(pStoreConf configuration);void close();void flush();bool createFileDirectory();protected:// Implement FileStoreBase virtual functionbool openInternal(bool incrementFilename, struct tm* current_time);boost::shared_ptr<apache::thrift::transport::TTransport> thriftFileTransport;unsigned long flushFrequencyMs;unsigned long msgBufferSize;unsigned long useSimpleFile;private:// disallow copy, assignment, and empty constructionThriftFileStore(ThriftFileStore& rhs);ThriftFileStore& operator=(ThriftFileStore& rhs);};/** This store aggregates messages and sends them to another store* in larger groups. If it is unable to do this it saves them to* a secondary store, then reads them and sends them to the* primary store when it's back online.** This actually involves two buffers. Messages are always buffered* briefly in memory, then they're buffered to a secondary store if* the primary store is down.*/class BufferStore : public Store {public:BufferStore(const std::string& category, bool multi_category);~BufferStore();boost::shared_ptr<Store> copy(const std::string &category);bool handleMessages(boost::shared_ptr<logentry_vector_t> messages);bool open();bool isOpen();void configure(pStoreConf configuration);void close();void flush();void periodicCheck();std::string getStatus();protected:// Store we're trying to get the messages toboost::shared_ptr<Store> primaryStore;// Store to use as a buffer if the primary is unavailable.// The store must be of a type that supports reading.boost::shared_ptr<Store> secondaryStore;// buffer state machineenum buffer_state_t {STREAMING, // connected to primary and sending directlyDISCONNECTED, // disconnected and writing to secondarySENDING_BUFFER, // connected to primary and sending data from secondary};void changeState(buffer_state_t new_state); // handles state pre and post conditionsconst char* stateAsString(buffer_state_t state);time_t getNewRetryInterval(); // generates a random interval based on config// configurationunsigned long maxQueueLength; // in number of messagesunsigned long bufferSendRate; // number of buffer files sent each periodicChecktime_t avgRetryInterval; // in seconds, for retrying primary store opentime_t retryIntervalRange; // in secondsbool replayBuffer; // whether to send buffers from// secondary store to primary// statebuffer_state_t state;time_t lastWriteTime;time_t lastOpenAttempt;time_t retryInterval;private:// disallow copy, assignment, and empty constructionBufferStore();BufferStore(BufferStore& rhs);BufferStore& operator=(BufferStore& rhs);};/** This store sends messages to another scribe server.* This class is really just an adapter to the global* connection pool g_connPool.*/class NetworkStore : public Store {public:NetworkStore(const std::string& category, bool multi_category);~NetworkStore();boost::shared_ptr<Store> copy(const std::string &category);bool handleMessages(boost::shared_ptr<logentry_vector_t> messages);bool open();bool isOpen();void configure(pStoreConf configuration);void close();void flush();protected:static const long int DEFAULT_SOCKET_TIMEOUT_MS = 5000; // 5 sec timeout// configurationbool useConnPool;bool smcBased;long int timeout;std::string remoteHost;unsigned long remotePort; // long because it works with config codestd::string smcService;std::string serviceOptions;server_vector_t servers;unsigned long serviceCacheTimeout;time_t lastServiceCheck;// statebool opened;boost::shared_ptr<scribeConn> unpooledConn; // null if useConnPoolprivate:// disallow copy, assignment, and empty constructionNetworkStore();NetworkStore(NetworkStore& rhs);NetworkStore& operator=(NetworkStore& rhs);};/** This store separates messages into many groups based on a* hash function, and sends each group to a different store.*/class BucketStore : public Store {public:BucketStore(const std::string& category, bool multi_category);~BucketStore();boost::shared_ptr<Store> copy(const std::string &category);bool handleMessages(boost::shared_ptr<logentry_vector_t> messages);bool open();bool isOpen();void configure(pStoreConf configuration);void close();void flush();void periodicCheck();std::string getStatus();protected:enum bucketizer_type {context_log,random, // randomly hash messages without using any keykey_hash, // use hashing to split keys into bucketskey_modulo, // use modulo to split keys into bucketskey_range // use bucketRange to compute modulo to split keys into buckets};bucketizer_type bucketType;char delimiter;bool removeKey;bool opened;unsigned long bucketRange; // used to compute key_range bucketizingunsigned long numBuckets;std::vector<boost::shared_ptr<Store> > buckets;unsigned long bucketize(const std::string& message);std::string getMessageWithoutKey(const std::string& message);private:// disallow copy, assignment, and emtpy constructionBucketStore();BucketStore(BucketStore& rhs);BucketStore& operator=(BucketStore& rhs);void createBucketsFromBucket(pStoreConf configuration,pStoreConf bucket_conf);void createBuckets(pStoreConf configuration);};/** This store intentionally left blank.*/class NullStore : public Store {public:NullStore(const std::string& category, bool multi_category);virtual ~NullStore();boost::shared_ptr<Store> copy(const std::string &category);bool open();bool isOpen();void configure(pStoreConf configuration);void close();bool handleMessages(boost::shared_ptr<logentry_vector_t> messages);void flush();// null stores are readable, but you never get anythingvirtual bool readOldest(/*out*/ boost::shared_ptr<logentry_vector_t> messages, struct tm* now);virtual bool replaceOldest(boost::shared_ptr<logentry_vector_t> messages,struct tm* now);virtual void deleteOldest(struct tm* now);virtual bool empty(struct tm* now);private:// disallow empty constructor, copy and assignmentNullStore();NullStore(Store& rhs);NullStore& operator=(Store& rhs);};/** This store relays messages to n other stores* @author Joel Seligstein*/class MultiStore : public Store {public:MultiStore(const std::string& category, bool multi_category);~MultiStore();boost::shared_ptr<Store> copy(const std::string &category);bool open();bool isOpen();void configure(pStoreConf configuration);void close();bool handleMessages(boost::shared_ptr<logentry_vector_t> messages);void periodicCheck();void flush();// read won't make sense since we don't know which store to read frombool readOldest(/*out*/ boost::shared_ptr<logentry_vector_t> messages,struct tm* now) { return false; }void deleteOldest(struct tm* now) {}bool empty(struct tm* now) { return true; }protected:std::vector<boost::shared_ptr<Store> > stores;enum report_success_value {SUCCESS_ANY = 1,SUCCESS_ALL};report_success_value report_success;private:// disallow copy, assignment, and empty constructionMultiStore();MultiStore(Store& rhs);MultiStore& operator=(Store& rhs);};/** This store will contain a separate store for every distinct* category it encounters.**/class CategoryStore : public Store {public:CategoryStore(const std::string& category, bool multi_category);CategoryStore(const std::string& category, const std::string& name,bool multiCategory);~CategoryStore();boost::shared_ptr<Store> copy(const std::string &category);bool open();bool isOpen();void configure(pStoreConf configuration);void close();bool handleMessages(boost::shared_ptr<logentry_vector_t> messages);void periodicCheck();void flush();protected:void configureCommon(pStoreConf configuration, const std::string type);boost::shared_ptr<Store> modelStore;std::map<std::string, boost::shared_ptr<Store> > stores;private:CategoryStore();CategoryStore(Store& rhs);CategoryStore& operator=(Store& rhs);};/** MultiFileStore is similar to FileStore except that it uses a separate file* for every category. This is useful only if this store can handle mutliple* categories.*/class MultiFileStore : public CategoryStore {public:MultiFileStore(const std::string& category, bool multi_category);~MultiFileStore();void configure(pStoreConf configuration);private:MultiFileStore();MultiFileStore(Store& rhs);MultiFileStore& operator=(Store& rhs);};/** ThriftMultiFileStore is similar to ThriftFileStore except that it uses a* separate thrift file for every category. This is useful only if this store* can handle mutliple categories.*/class ThriftMultiFileStore : public CategoryStore {public:ThriftMultiFileStore(const std::string& category, bool multi_category);~ThriftMultiFileStore();void configure(pStoreConf configuration);private:ThriftMultiFileStore();ThriftMultiFileStore(Store& rhs);ThriftMultiFileStore& operator=(Store& rhs);};#endif // SCRIBE_STORE_H