Subversion Repositories SmartDukaan

Rev

Rev 301 | Details | Compare with Previous | Last modification | View Log | RSS feed

Rev Author Line No. Line
301 ashish 1
//  Copyright (c) 2007-2008 Facebook
2
//
3
//  Licensed under the Apache License, Version 2.0 (the "License");
4
//  you may not use this file except in compliance with the License.
5
//  You may obtain a copy of the License at
6
//
7
//      http://www.apache.org/licenses/LICENSE-2.0
8
//
9
//  Unless required by applicable law or agreed to in writing, software
10
//  distributed under the License is distributed on an "AS IS" BASIS,
11
//  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12
//  See the License for the specific language governing permissions and
13
//  limitations under the License.
14
//
15
// See accompanying file LICENSE or visit the Scribe site at:
16
// http://developers.facebook.com/scribe/
17
//
18
// @author Bobby Johnson
19
// @author James Wang
20
// @author Jason Sobel
21
// @author Alex Moskalyuk
22
// @author Avinash Lakshman
23
// @author Anthony Giardullo
24
// @author Jan Oravec
25
 
26
#ifndef SCRIBE_STORE_H
27
#define SCRIBE_STORE_H
28
 
29
#include "common.h" // includes std libs, thrift, and stl typedefs
30
#include "conf.h"
31
#include "file.h"
32
#include "conn_pool.h"
33
 
34
/* defines used by the store class */
35
enum roll_period_t {
36
  ROLL_NEVER,
37
  ROLL_HOURLY,
38
  ROLL_DAILY,
39
  ROLL_OTHER
40
};
41
 
42
 
43
/*
44
 * Abstract class to define the interface for a store
45
 * and implement some basic functionality.
46
 */
47
class Store {
48
 public:
49
  // Creates an object of the appropriate subclass.
50
  static boost::shared_ptr<Store>
51
    createStore(const std::string& type, const std::string& category,
52
                bool readable = false, bool multi_category = false);
53
 
54
  Store(const std::string& category, const std::string &type,
55
        bool multi_category = false);
56
  virtual ~Store();
57
 
58
  virtual boost::shared_ptr<Store> copy(const std::string &category) = 0;
59
  virtual bool open() = 0;
60
  virtual bool isOpen() = 0;
61
  virtual void configure(pStoreConf configuration) = 0;
62
  virtual void close() = 0;
63
 
64
  // Attempts to store messages and returns true if successful.
65
  // On failure, returns false and messages contains any un-processed messages
66
  virtual bool handleMessages(boost::shared_ptr<logentry_vector_t> messages) = 0;
67
  virtual void periodicCheck() {}
68
  virtual void flush() = 0;
69
 
70
  virtual std::string getStatus();
71
 
72
  // following methods must be overidden to make a store readable
73
  virtual bool readOldest(/*out*/ boost::shared_ptr<logentry_vector_t> messages,
74
                          struct tm* now);
75
  virtual void deleteOldest(struct tm* now);
76
  virtual bool replaceOldest(boost::shared_ptr<logentry_vector_t> messages,
77
                             struct tm* now);
78
  virtual bool empty(struct tm* now);
79
 
80
  // don't need to override
81
  virtual const std::string& getType();
82
 
83
 protected:
84
  virtual void setStatus(const std::string& new_status);
85
  std::string status;
86
  std::string categoryHandled;
87
  bool multiCategory;             // Whether multiple categories are handled
88
  std::string storeType;
89
 
90
  // Don't ever take this lock for multiple stores at the same time
91
  pthread_mutex_t statusMutex;
92
 
93
 private:
94
  // disallow copy, assignment, and empty construction
95
  Store(Store& rhs);
96
  Store& operator=(Store& rhs);
97
};
98
 
99
/*
100
 * Abstract class that serves as a base for file-based stores.
101
 * This class has logic for naming files and deciding when to rotate.
102
 */
103
class FileStoreBase : public Store {
104
 public:
105
  FileStoreBase(const std::string& category, const std::string &type,
106
                bool multi_category);
107
  ~FileStoreBase();
108
 
109
  virtual void copyCommon(const FileStoreBase *base);
110
  bool open();
111
  void configure(pStoreConf configuration);
112
  void periodicCheck();
113
 
114
 protected:
115
  // We need to pass arguments to open when called internally.
116
  // The external open function just calls this with default args.
117
  virtual bool openInternal(bool incrementFilename, struct tm* current_time) = 0;
118
  virtual void rotateFile(time_t currentTime = 0);
119
 
120
 
121
  // appends information about the current file to a log file in the same directory
122
  virtual void printStats();
123
 
124
  // Returns the number of bytes to pad to align to the specified block size
125
  unsigned long bytesToPad(unsigned long next_message_length,
126
                           unsigned long current_file_size,
127
                           unsigned long chunk_size);
128
 
129
  // A full filename includes an absolute path and a sequence number suffix.
130
  std::string makeBaseFilename(struct tm* creation_time);
131
  std::string makeFullFilename(int suffix, struct tm* creation_time,
132
                               bool use_full_path = true);
133
  std::string makeBaseSymlink();
134
  std::string makeFullSymlink();
135
  int  findOldestFile(const std::string& base_filename);
136
  int  findNewestFile(const std::string& base_filename);
137
  int  getFileSuffix(const std::string& filename,
138
                     const std::string& base_filename);
139
  void setHostNameSubDir();
140
 
141
  // Configuration
142
  std::string baseFilePath;
143
  std::string subDirectory;
144
  std::string filePath;
145
  std::string baseFileName;
146
  std::string baseSymlinkName;
147
  unsigned long maxSize;
148
  unsigned long maxWriteSize;
149
  roll_period_t rollPeriod;
150
  time_t rollPeriodLength;
151
  unsigned long rollHour;
152
  unsigned long rollMinute;
153
  std::string fsType;
154
  unsigned long chunkSize;
155
  bool writeMeta;
156
  bool writeCategory;
157
  bool createSymlink;
158
  bool writeStats;
159
 
160
  // State
161
  unsigned long currentSize;
162
  time_t lastRollTime;         // either hour, day or time since epoch,
163
                               // depending on rollPeriod
164
  std::string currentFilename; // this isn't used to choose the next file name,
165
                               // we just need it for reporting
166
  unsigned long eventsWritten; // This is how many events this process has
167
                               // written to the currently open file. It is NOT
168
                               // necessarily the number of lines in the file
169
 
170
 private:
171
  // disallow copy, assignment, and empty construction
172
  FileStoreBase(FileStoreBase& rhs);
173
  FileStoreBase& operator=(FileStoreBase& rhs);
174
};
175
 
176
/*
177
 * This file-based store uses an instance of a FileInterface class that
178
 * handles the details of interfacing with the filesystem. (see file.h)
179
 */
180
class FileStore : public FileStoreBase {
181
 
182
 public:
183
  FileStore(const std::string& category, bool multi_category,
184
            bool is_buffer_file = false);
185
  ~FileStore();
186
 
187
  boost::shared_ptr<Store> copy(const std::string &category);
188
  bool handleMessages(boost::shared_ptr<logentry_vector_t> messages);
189
  bool isOpen();
190
  void configure(pStoreConf configuration);
191
  void close();
192
  void flush();
193
 
194
  // Each read does its own open and close and gets the whole file.
195
  // This is separate from the write file, and not really a consistent interface.
196
  bool readOldest(/*out*/ boost::shared_ptr<logentry_vector_t> messages,
197
                  struct tm* now);
198
  virtual bool replaceOldest(boost::shared_ptr<logentry_vector_t> messages,
199
                             struct tm* now);
200
  void deleteOldest(struct tm* now);
201
  bool empty(struct tm* now);
202
 
203
 protected:
204
  // Implement FileStoreBase virtual function
205
  bool openInternal(bool incrementFilename, struct tm* current_time);
206
  bool writeMessages(boost::shared_ptr<logentry_vector_t> messages,
207
                     boost::shared_ptr<FileInterface> write_file =
208
                     boost::shared_ptr<FileInterface>());
209
 
210
  bool isBufferFile;
211
  bool addNewlines;
212
 
213
  // State
214
  boost::shared_ptr<FileInterface> writeFile;
215
 
216
 private:
217
  // disallow copy, assignment, and empty construction
218
  FileStore(FileStore& rhs);
219
  FileStore& operator=(FileStore& rhs);
220
};
221
 
222
/*
223
 * This file-based store relies on thrift's TFileTransport to do the writing
224
 */
225
class ThriftFileStore : public FileStoreBase {
226
 public:
227
  ThriftFileStore(const std::string& category, bool multi_category);
228
  ~ThriftFileStore();
229
 
230
  boost::shared_ptr<Store> copy(const std::string &category);
231
  bool handleMessages(boost::shared_ptr<logentry_vector_t> messages);
232
  bool open();
233
  bool isOpen();
234
  void configure(pStoreConf configuration);
235
  void close();
236
  void flush();
237
  bool createFileDirectory();
238
 
239
 protected:
240
  // Implement FileStoreBase virtual function
241
  bool openInternal(bool incrementFilename, struct tm* current_time);
242
 
243
  boost::shared_ptr<apache::thrift::transport::TTransport> thriftFileTransport;
244
 
245
  unsigned long flushFrequencyMs;
246
  unsigned long msgBufferSize;
247
  unsigned long useSimpleFile;
248
 
249
 private:
250
  // disallow copy, assignment, and empty construction
251
  ThriftFileStore(ThriftFileStore& rhs);
252
  ThriftFileStore& operator=(ThriftFileStore& rhs);
253
};
254
 
255
/*
256
 * This store aggregates messages and sends them to another store
257
 * in larger groups. If it is unable to do this it saves them to
258
 * a secondary store, then reads them and sends them to the
259
 * primary store when it's back online.
260
 *
261
 * This actually involves two buffers. Messages are always buffered
262
 * briefly in memory, then they're buffered to a secondary store if
263
 * the primary store is down.
264
 */
265
class BufferStore : public Store {
266
 
267
 public:
268
  BufferStore(const std::string& category, bool multi_category);
269
  ~BufferStore();
270
 
271
  boost::shared_ptr<Store> copy(const std::string &category);
272
  bool handleMessages(boost::shared_ptr<logentry_vector_t> messages);
273
  bool open();
274
  bool isOpen();
275
  void configure(pStoreConf configuration);
276
  void close();
277
  void flush();
278
  void periodicCheck();
279
 
280
  std::string getStatus();
281
 
282
 protected:
283
  // Store we're trying to get the messages to
284
  boost::shared_ptr<Store> primaryStore;
285
 
286
  // Store to use as a buffer if the primary is unavailable.
287
  // The store must be of a type that supports reading.
288
  boost::shared_ptr<Store> secondaryStore;
289
 
290
  // buffer state machine
291
  enum buffer_state_t {
292
    STREAMING,       // connected to primary and sending directly
293
    DISCONNECTED,    // disconnected and writing to secondary
294
    SENDING_BUFFER,  // connected to primary and sending data from secondary
295
  };
296
 
297
  void changeState(buffer_state_t new_state); // handles state pre and post conditions
298
  const char* stateAsString(buffer_state_t state);
299
 
300
  time_t getNewRetryInterval(); // generates a random interval based on config
301
 
302
  // configuration
303
  unsigned long maxQueueLength;   // in number of messages
304
  unsigned long bufferSendRate;   // number of buffer files sent each periodicCheck
305
  time_t avgRetryInterval;        // in seconds, for retrying primary store open
306
  time_t retryIntervalRange;      // in seconds
307
  bool   replayBuffer;            // whether to send buffers from
308
                                  // secondary store to primary
309
 
310
  // state
311
  buffer_state_t state;
312
  time_t lastWriteTime;
313
  time_t lastOpenAttempt;
314
  time_t retryInterval;
315
 
316
 private:
317
  // disallow copy, assignment, and empty construction
318
  BufferStore();
319
  BufferStore(BufferStore& rhs);
320
  BufferStore& operator=(BufferStore& rhs);
321
};
322
 
323
/*
324
 * This store sends messages to another scribe server.
325
 * This class is really just an adapter to the global
326
 * connection pool g_connPool.
327
 */
328
class NetworkStore : public Store {
329
 
330
 public:
331
  NetworkStore(const std::string& category, bool multi_category);
332
  ~NetworkStore();
333
 
334
  boost::shared_ptr<Store> copy(const std::string &category);
335
  bool handleMessages(boost::shared_ptr<logentry_vector_t> messages);
336
  bool open();
337
  bool isOpen();
338
  void configure(pStoreConf configuration);
339
  void close();
340
  void flush();
341
 
342
 protected:
343
  static const long int DEFAULT_SOCKET_TIMEOUT_MS = 5000; // 5 sec timeout
344
 
345
  // configuration
346
  bool useConnPool;
347
  bool smcBased;
348
  long int timeout;
349
  std::string remoteHost;
350
  unsigned long remotePort; // long because it works with config code
351
  std::string smcService;
352
  std::string serviceOptions;
353
  server_vector_t servers;
354
  unsigned long serviceCacheTimeout;
355
  time_t lastServiceCheck;
356
 
357
  // state
358
  bool opened;
359
  boost::shared_ptr<scribeConn> unpooledConn; // null if useConnPool
360
 
361
 private:
362
  // disallow copy, assignment, and empty construction
363
  NetworkStore();
364
  NetworkStore(NetworkStore& rhs);
365
  NetworkStore& operator=(NetworkStore& rhs);
366
};
367
 
368
/*
369
 * This store separates messages into many groups based on a
370
 * hash function, and sends each group to a different store.
371
 */
372
class BucketStore : public Store {
373
 
374
 public:
375
  BucketStore(const std::string& category, bool multi_category);
376
  ~BucketStore();
377
 
378
  boost::shared_ptr<Store> copy(const std::string &category);
379
  bool handleMessages(boost::shared_ptr<logentry_vector_t> messages);
380
  bool open();
381
  bool isOpen();
382
  void configure(pStoreConf configuration);
383
  void close();
384
  void flush();
385
  void periodicCheck();
386
 
387
  std::string getStatus();
388
 
389
 protected:
390
  enum bucketizer_type {
391
    context_log,
392
    random,      // randomly hash messages without using any key
393
    key_hash,    // use hashing to split keys into buckets
394
    key_modulo,  // use modulo to split keys into buckets
395
    key_range    // use bucketRange to compute modulo to split keys into buckets
396
  };
397
 
398
  bucketizer_type bucketType;
399
  char delimiter;
400
  bool removeKey;
401
  bool opened;
402
  unsigned long bucketRange;  // used to compute key_range bucketizing
403
  unsigned long numBuckets;
404
  std::vector<boost::shared_ptr<Store> > buckets;
405
 
406
  unsigned long bucketize(const std::string& message);
407
  std::string getMessageWithoutKey(const std::string& message);
408
 
409
 private:
410
  // disallow copy, assignment, and emtpy construction
411
  BucketStore();
412
  BucketStore(BucketStore& rhs);
413
  BucketStore& operator=(BucketStore& rhs);
414
  void createBucketsFromBucket(pStoreConf configuration,
415
                               pStoreConf bucket_conf);
416
  void createBuckets(pStoreConf configuration);
417
};
418
 
419
/*
420
 * This store intentionally left blank.
421
 */
422
class NullStore : public Store {
423
 
424
 public:
425
  NullStore(const std::string& category, bool multi_category);
426
  virtual ~NullStore();
427
 
428
  boost::shared_ptr<Store> copy(const std::string &category);
429
  bool open();
430
  bool isOpen();
431
  void configure(pStoreConf configuration);
432
  void close();
433
 
434
  bool handleMessages(boost::shared_ptr<logentry_vector_t> messages);
435
  void flush();
436
 
437
  // null stores are readable, but you never get anything
438
  virtual bool readOldest(/*out*/ boost::shared_ptr<logentry_vector_t> messages,                          struct tm* now);
439
  virtual bool replaceOldest(boost::shared_ptr<logentry_vector_t> messages,
440
                             struct tm* now);
441
  virtual void deleteOldest(struct tm* now);
442
  virtual bool empty(struct tm* now);
443
 
444
 
445
 private:
446
  // disallow empty constructor, copy and assignment
447
  NullStore();
448
  NullStore(Store& rhs);
449
  NullStore& operator=(Store& rhs);
450
};
451
 
452
/*
453
 * This store relays messages to n other stores
454
 * @author Joel Seligstein
455
 */
456
class MultiStore : public Store {
457
 public:
458
  MultiStore(const std::string& category, bool multi_category);
459
  ~MultiStore();
460
 
461
  boost::shared_ptr<Store> copy(const std::string &category);
462
  bool open();
463
  bool isOpen();
464
  void configure(pStoreConf configuration);
465
  void close();
466
 
467
  bool handleMessages(boost::shared_ptr<logentry_vector_t> messages);
468
  void periodicCheck();
469
  void flush();
470
 
471
  // read won't make sense since we don't know which store to read from
472
  bool readOldest(/*out*/ boost::shared_ptr<logentry_vector_t> messages,
473
                  struct tm* now) { return false; }
474
  void deleteOldest(struct tm* now) {}
475
  bool empty(struct tm* now) { return true; }
476
 
477
 protected:
478
  std::vector<boost::shared_ptr<Store> > stores;
479
  enum report_success_value {
480
    SUCCESS_ANY = 1,
481
    SUCCESS_ALL
482
  };
483
  report_success_value report_success;
484
 
485
 private:
486
  // disallow copy, assignment, and empty construction
487
  MultiStore();
488
  MultiStore(Store& rhs);
489
  MultiStore& operator=(Store& rhs);
490
};
491
 
492
 
493
/*
494
 * This store will contain a separate store for every distinct
495
 * category it encounters.
496
 *
497
 */
498
class CategoryStore : public Store {
499
 public:
500
  CategoryStore(const std::string& category, bool multi_category);
501
  CategoryStore(const std::string& category, const std::string& name,
502
                bool multiCategory);
503
  ~CategoryStore();
504
 
505
  boost::shared_ptr<Store> copy(const std::string &category);
506
  bool open();
507
  bool isOpen();
508
  void configure(pStoreConf configuration);
509
  void close();
510
 
511
  bool handleMessages(boost::shared_ptr<logentry_vector_t> messages);
512
  void periodicCheck();
513
  void flush();
514
 
515
 protected:
516
  void configureCommon(pStoreConf configuration, const std::string type);
517
  boost::shared_ptr<Store> modelStore;
518
  std::map<std::string, boost::shared_ptr<Store> > stores;
519
 
520
 private:
521
  CategoryStore();
522
  CategoryStore(Store& rhs);
523
  CategoryStore& operator=(Store& rhs);
524
};
525
 
526
/*
527
 * MultiFileStore is similar to FileStore except that it uses a separate file
528
 * for every category.  This is useful only if this store can handle mutliple
529
 * categories.
530
 */
531
class MultiFileStore : public CategoryStore {
532
 public:
533
  MultiFileStore(const std::string& category, bool multi_category);
534
  ~MultiFileStore();
535
  void configure(pStoreConf configuration);
536
 
537
 private:
538
  MultiFileStore();
539
  MultiFileStore(Store& rhs);
540
  MultiFileStore& operator=(Store& rhs);
541
};
542
 
543
/*
544
 * ThriftMultiFileStore is similar to ThriftFileStore except that it uses a
545
 * separate thrift file for every category.  This is useful only if this store
546
 * can handle mutliple categories.
547
 */
548
class ThriftMultiFileStore : public CategoryStore {
549
 public:
550
  ThriftMultiFileStore(const std::string& category, bool multi_category);
551
  ~ThriftMultiFileStore();
552
  void configure(pStoreConf configuration);
553
 
554
 
555
 private:
556
  ThriftMultiFileStore();
557
  ThriftMultiFileStore(Store& rhs);
558
  ThriftMultiFileStore& operator=(Store& rhs);
559
};
560
#endif // SCRIBE_STORE_H