Subversion Repositories SmartDukaan

Rev

Rev 301 | Details | Compare with Previous | Last modification | View Log | RSS feed

Rev Author Line No. Line
301 ashish 1
//  Copyright (c) 2007-2008 Facebook
2
//
3
//  Licensed under the Apache License, Version 2.0 (the "License");
4
//  you may not use this file except in compliance with the License.
5
//  You may obtain a copy of the License at
6
//
7
//      http://www.apache.org/licenses/LICENSE-2.0
8
//
9
//  Unless required by applicable law or agreed to in writing, software
10
//  distributed under the License is distributed on an "AS IS" BASIS,
11
//  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12
//  See the License for the specific language governing permissions and
13
//  limitations under the License.
14
//
15
// See accompanying file LICENSE or visit the Scribe site at:
16
// http://developers.facebook.com/scribe/
17
//
18
// @author Bobby Johnson
19
// @author James Wang
20
// @author Jason Sobel
21
// @author Alex Moskalyuk
22
// @author Avinash Lakshman
23
// @author Anthony Giardullo
24
// @author Jan Oravec
25
 
26
#include "common.h"
27
#include "scribe_server.h"
28
#include "thrift/transport/TSimpleFileTransport.h"
29
 
30
using namespace std;
31
using namespace boost;
32
using namespace boost::filesystem;
33
using namespace apache::thrift;
34
using namespace apache::thrift::protocol;
35
using namespace apache::thrift::transport;
36
using namespace apache::thrift::server;
37
using namespace scribe::thrift;
38
 
39
#define DEFAULT_FILESTORE_MAX_SIZE               1000000000
40
#define DEFAULT_FILESTORE_MAX_WRITE_SIZE         1000000
41
#define DEFAULT_FILESTORE_ROLL_HOUR              1
42
#define DEFAULT_FILESTORE_ROLL_MINUTE            15
43
#define DEFAULT_BUFFERSTORE_MAX_QUEUE_LENGTH     2000000
44
#define DEFAULT_BUFFERSTORE_SEND_RATE            1
45
#define DEFAULT_BUFFERSTORE_AVG_RETRY_INTERVAL   300
46
#define DEFAULT_BUFFERSTORE_RETRY_INTERVAL_RANGE 60
47
#define DEFAULT_BUCKETSTORE_DELIMITER            ':'
48
#define DEFAULT_NETWORKSTORE_CACHE_TIMEOUT       300
49
 
50
ConnPool g_connPool;
51
 
52
const string meta_logfile_prefix = "scribe_meta<new_logfile>: ";
53
 
54
boost::shared_ptr<Store>
55
Store::createStore(const string& type, const string& category,
56
                   bool readable, bool multi_category) {
57
  if (0 == type.compare("file")) {
58
    return shared_ptr<Store>(new FileStore(category, multi_category, readable));
59
  } else if (0 == type.compare("buffer")) {
60
    return shared_ptr<Store>(new BufferStore(category, multi_category));
61
  } else if (0 == type.compare("network")) {
62
    return shared_ptr<Store>(new NetworkStore(category, multi_category));
63
  } else if (0 == type.compare("bucket")) {
64
    return shared_ptr<Store>(new BucketStore(category, multi_category));
65
  } else if (0 == type.compare("thriftfile")) {
66
    return shared_ptr<Store>(new ThriftFileStore(category, multi_category));
67
  } else if (0 == type.compare("null")) {
68
    return shared_ptr<Store>(new NullStore(category, multi_category));
69
  } else if (0 == type.compare("multi")) {
70
    return shared_ptr<Store>(new MultiStore(category, multi_category));
71
  } else if (0 == type.compare("category")) {
72
    return shared_ptr<Store>(new CategoryStore(category, multi_category));
73
  } else if (0 == type.compare("multifile")) {
74
    return shared_ptr<Store>(new MultiFileStore(category, multi_category));
75
  } else if (0 == type.compare("thriftmultifile")) {
76
    return shared_ptr<Store>(new ThriftMultiFileStore(category, multi_category));
77
  } else {
78
    return shared_ptr<Store>();
79
  }
80
}
81
 
82
Store::Store(const string& category, const string &type, bool multi_category)
83
  : categoryHandled(category),
84
    multiCategory(multi_category),
85
    storeType(type) {
86
  pthread_mutex_init(&statusMutex, NULL);
87
}
88
 
89
Store::~Store() {
90
  pthread_mutex_destroy(&statusMutex);
91
}
92
 
93
void Store::setStatus(const std::string& new_status) {
94
  pthread_mutex_lock(&statusMutex);
95
  status = new_status;
96
  pthread_mutex_unlock(&statusMutex);
97
}
98
 
99
std::string Store::getStatus() {
100
  pthread_mutex_lock(&statusMutex);
101
  std::string return_status(status);
102
  pthread_mutex_unlock(&statusMutex);
103
  return return_status;
104
}
105
 
106
bool Store::readOldest(/*out*/ boost::shared_ptr<logentry_vector_t> messages,
107
                       struct tm* now) {
108
  LOG_OPER("[%s] ERROR: attempting to read from a write-only store", categoryHandled.c_str());
109
  return false;
110
}
111
 
112
bool Store::replaceOldest(boost::shared_ptr<logentry_vector_t> messages,
113
                          struct tm* now) {
114
  LOG_OPER("[%s] ERROR: attempting to read from a write-only store", categoryHandled.c_str());
115
  return false;
116
}
117
 
118
void Store::deleteOldest(struct tm* now) {
119
   LOG_OPER("[%s] ERROR: attempting to read from a write-only store", categoryHandled.c_str());
120
}
121
 
122
bool Store::empty(struct tm* now) {
123
  LOG_OPER("[%s] ERROR: attempting to read from a write-only store", categoryHandled.c_str());
124
  return true;
125
}
126
 
127
const std::string& Store::getType() {
128
  return storeType;
129
}
130
 
131
FileStoreBase::FileStoreBase(const string& category, const string &type,
132
                             bool multi_category)
133
  : Store(category, type, multi_category),
134
    baseFilePath("/tmp"),
135
    subDirectory(""),
136
    filePath("/tmp"),
137
    baseFileName(category),
138
    baseSymlinkName(""),
139
    maxSize(DEFAULT_FILESTORE_MAX_SIZE),
140
    maxWriteSize(DEFAULT_FILESTORE_MAX_WRITE_SIZE),
141
    rollPeriod(ROLL_NEVER),
142
    rollPeriodLength(0),
143
    rollHour(DEFAULT_FILESTORE_ROLL_HOUR),
144
    rollMinute(DEFAULT_FILESTORE_ROLL_MINUTE),
145
    fsType("std"),
146
    chunkSize(0),
147
    writeMeta(false),
148
    writeCategory(false),
149
    createSymlink(true),
150
    writeStats(false),
151
    currentSize(0),
152
    lastRollTime(0),
153
    eventsWritten(0) {
154
}
155
 
156
FileStoreBase::~FileStoreBase() {
157
}
158
 
159
void FileStoreBase::configure(pStoreConf configuration) {
160
 
161
  // We can run using defaults for all of these, but there are
162
  // a couple of suspicious things we warn about.
163
  std::string tmp;
164
  configuration->getString("file_path", baseFilePath);
165
  configuration->getString("sub_directory", subDirectory);
166
  configuration->getString("use_hostname_sub_directory", tmp);
167
 
168
  if (0 == tmp.compare("yes")) {
169
    setHostNameSubDir();
170
  }
171
 
172
  filePath = baseFilePath;
173
  if (!subDirectory.empty()) {
174
    filePath += "/" + subDirectory;
175
  }
176
 
177
 
178
  if (!configuration->getString("base_filename", baseFileName)) {
179
    LOG_OPER("[%s] WARNING: Bad config - no base_filename specified for file store", categoryHandled.c_str());
180
  }
181
 
182
  // check if symlink name is optionally specified
183
  configuration->getString("base_symlink_name", baseSymlinkName);
184
 
185
  if (configuration->getString("rotate_period", tmp)) {
186
    if (0 == tmp.compare("hourly")) {
187
      rollPeriod = ROLL_HOURLY;
188
    } else if (0 == tmp.compare("daily")) {
189
      rollPeriod = ROLL_DAILY;
190
    } else if (0 == tmp.compare("never")) {
191
      rollPeriod = ROLL_NEVER;
192
    } else {
193
      errno = 0;
194
      char* endptr;
195
      rollPeriod = ROLL_OTHER;
196
      rollPeriodLength = strtol(tmp.c_str(), &endptr, 10);
197
 
198
      bool ok = errno == 0 && rollPeriodLength > 0 && endptr != tmp.c_str() &&
199
                (*endptr == '\0' || endptr[1] == '\0');
200
      switch (*endptr) {
201
        case 'w':
202
          rollPeriodLength *= 60 * 60 * 24 * 7;
203
          break;
204
        case 'd':
205
          rollPeriodLength *= 60 * 60 * 24;
206
          break;
207
        case 'h':
208
          rollPeriodLength *= 60 * 60;
209
          break;
210
        case 'm':
211
          rollPeriodLength *= 60;
212
          break;
213
        case 's':
214
        case '\0':
215
          break;
216
        default:
217
          ok = false;
218
          break;
219
      }
220
 
221
      if (!ok) {
222
        rollPeriod = ROLL_NEVER;
223
        LOG_OPER("[%s] WARNING: Bad config - invalid format of rotate_period,"
224
                 " rotations disabled", categoryHandled.c_str());
225
      }
226
    }
227
  }
228
 
229
  if (configuration->getString("write_meta", tmp)) {
230
    if (0 == tmp.compare("yes")) {
231
      writeMeta = true;
232
    }
233
  }
234
  if (configuration->getString("write_category", tmp)) {
235
    if (0 == tmp.compare("yes")) {
236
      writeCategory = true;
237
    }
238
  }
239
 
240
  if (configuration->getString("create_symlink", tmp)) {
241
    if (0 == tmp.compare("yes")) {
242
      createSymlink = true;
243
    } else {
244
      createSymlink = false;
245
    }
246
  }
247
 
248
  if (configuration->getString("write_stats", tmp)) {
249
    if (0 == tmp.compare("yes")) {
250
      writeStats = true;
251
    } else {
252
      writeStats = false;
253
    }
254
  }
255
 
256
  configuration->getString("fs_type", fsType);
257
 
258
  configuration->getUnsigned("max_size", maxSize);
259
  configuration->getUnsigned("max_write_size", maxWriteSize);
260
  configuration->getUnsigned("rotate_hour", rollHour);
261
  configuration->getUnsigned("rotate_minute", rollMinute);
262
  configuration->getUnsigned("chunk_size", chunkSize);
263
}
264
 
265
void FileStoreBase::copyCommon(const FileStoreBase *base) {
266
  subDirectory = base->subDirectory;
267
  chunkSize = base->chunkSize;
268
  maxSize = base->maxSize;
269
  maxWriteSize = base->maxWriteSize;
270
  rollPeriod = base->rollPeriod;
271
  rollPeriodLength = base->rollPeriodLength;
272
  rollHour = base->rollHour;
273
  rollMinute = base->rollMinute;
274
  fsType = base->fsType;
275
  writeMeta = base->writeMeta;
276
  writeCategory = base->writeCategory;
277
  createSymlink = base->createSymlink;
278
  baseSymlinkName = base->baseSymlinkName;
279
  writeStats = base->writeStats;
280
 
281
  /*
282
   * append the category name to the base file path and change the
283
   * baseFileName to the category name. these are arbitrary, could be anything
284
   * unique
285
   */
286
  baseFilePath = base->baseFilePath + std::string("/") + categoryHandled;
287
  filePath = baseFilePath;
288
  if (!subDirectory.empty()) {
289
    filePath += "/" + subDirectory;
290
  }
291
 
292
  baseFileName = categoryHandled;
293
}
294
 
295
bool FileStoreBase::open() {
296
  return openInternal(false, NULL);
297
}
298
 
299
void FileStoreBase::periodicCheck() {
300
 
301
  time_t rawtime = time(NULL);
302
  struct tm timeinfo;
303
  localtime_r(&rawtime, &timeinfo);
304
 
305
  // Roll the file if we're over max size, or an hour or day has passed
306
  bool rotate = ((currentSize > maxSize) && (maxSize != 0));
307
  if (!rotate) {
308
    switch (rollPeriod) {
309
      case ROLL_DAILY:
310
        rotate = timeinfo.tm_mday != lastRollTime &&
311
                 static_cast<uint>(timeinfo.tm_hour) >= rollHour &&
312
                 static_cast<uint>(timeinfo.tm_min) >= rollMinute;
313
        break;
314
      case ROLL_HOURLY:
315
        rotate = timeinfo.tm_hour != lastRollTime &&
316
                 static_cast<uint>(timeinfo.tm_min) >= rollMinute;
317
        break;
318
      case ROLL_OTHER:
319
        rotate = rawtime >= lastRollTime + rollPeriodLength;
320
        break;
321
      case ROLL_NEVER:
322
        break;
323
    }
324
  }
325
 
326
  if (rotate) {
327
    rotateFile(rawtime);
328
  }
329
}
330
 
331
void FileStoreBase::rotateFile(time_t currentTime) {
332
  struct tm timeinfo;
333
 
334
  currentTime = currentTime > 0 ? currentTime : time(NULL);
335
  localtime_r(&currentTime, &timeinfo);
336
 
337
  LOG_OPER("[%s] %d:%d rotating file <%s> old size <%lu> max size <%lu>",
338
           categoryHandled.c_str(), timeinfo.tm_hour, timeinfo.tm_min,
339
           makeBaseFilename(&timeinfo).c_str(), currentSize, maxSize);
340
 
341
  printStats();
342
  openInternal(true, &timeinfo);
343
}
344
 
345
string FileStoreBase::makeFullFilename(int suffix, struct tm* creation_time,
346
                                       bool use_full_path) {
347
 
348
  ostringstream filename;
349
 
350
  if (use_full_path) {
351
    filename << filePath << '/';
352
  }
353
  filename << makeBaseFilename(creation_time);
354
  filename << '_' << setw(5) << setfill('0') << suffix;
355
 
356
  return filename.str();
357
}
358
 
359
string FileStoreBase::makeBaseSymlink() {
360
  ostringstream base;
361
  if (!baseSymlinkName.empty()) {
362
    base << baseSymlinkName << "_current";
363
  } else {
364
    base << baseFileName << "_current";
365
  }
366
  return base.str();
367
}
368
 
369
string FileStoreBase::makeFullSymlink() {
370
  ostringstream filename;
371
  filename << filePath << '/' << makeBaseSymlink();
372
  return filename.str();
373
}
374
 
375
string FileStoreBase::makeBaseFilename(struct tm* creation_time) {
376
  ostringstream filename;
377
 
378
  filename << baseFileName;
379
  if (rollPeriod != ROLL_NEVER) {
380
    filename << '-' << creation_time->tm_year + 1900  << '-'
381
             << setw(2) << setfill('0') << creation_time->tm_mon + 1 << '-'
382
             << setw(2) << setfill('0')  << creation_time->tm_mday;
383
 
384
  }
385
  return filename.str();
386
}
387
 
388
// returns the suffix of the newest file matching base_filename
389
int FileStoreBase::findNewestFile(const string& base_filename) {
390
 
391
  std::vector<std::string> files = FileInterface::list(filePath, fsType);
392
 
393
  int max_suffix = -1;
394
  std::string retval;
395
  for (std::vector<std::string>::iterator iter = files.begin();
396
       iter != files.end();
397
       ++iter) {
398
 
399
    int suffix = getFileSuffix(*iter, base_filename);
400
    if (suffix > max_suffix) {
401
      max_suffix = suffix;
402
    }
403
  }
404
  return max_suffix;
405
}
406
 
407
int FileStoreBase::findOldestFile(const string& base_filename) {
408
 
409
  std::vector<std::string> files = FileInterface::list(filePath, fsType);
410
 
411
  int min_suffix = -1;
412
  std::string retval;
413
  for (std::vector<std::string>::iterator iter = files.begin();
414
       iter != files.end();
415
       ++iter) {
416
 
417
    int suffix = getFileSuffix(*iter, base_filename);
418
    if (suffix >= 0 &&
419
        (min_suffix == -1 || suffix < min_suffix)) {
420
      min_suffix = suffix;
421
    }
422
  }
423
  return min_suffix;
424
}
425
 
426
int FileStoreBase::getFileSuffix(const string& filename, const string& base_filename) {
427
  int suffix = -1;
428
  string::size_type suffix_pos = filename.rfind('_');
429
 
430
  bool retVal = (0 == filename.substr(0, suffix_pos).compare(base_filename));
431
 
432
  if (string::npos != suffix_pos &&
433
      filename.length() > suffix_pos &&
434
      retVal) {
435
    stringstream stream;
436
    stream << filename.substr(suffix_pos + 1);
437
    stream >> suffix;
438
  }
439
  return suffix;
440
}
441
 
442
void FileStoreBase::printStats() {
443
  if (!writeStats) {
444
    return;
445
  }
446
 
447
  string filename(filePath);
448
  filename += "/scribe_stats";
449
 
450
  boost::shared_ptr<FileInterface> stats_file = FileInterface::createFileInterface(fsType, filename);
451
  if (!stats_file ||
452
      !stats_file->createDirectory(filePath) ||
453
      !stats_file->openWrite()) {
454
    LOG_OPER("[%s] Failed to open stats file <%s> of type <%s> for writing",
455
             categoryHandled.c_str(), filename.c_str(), fsType.c_str());
456
    // This isn't enough of a problem to change our status
457
    return;
458
  }
459
 
460
  time_t rawtime = time(NULL);
461
  struct tm timeinfo;
462
  localtime_r(&rawtime, &timeinfo);
463
 
464
  ostringstream msg;
465
  msg << timeinfo.tm_year + 1900  << '-'
466
      << setw(2) << setfill('0') << timeinfo.tm_mon + 1 << '-'
467
      << setw(2) << setfill('0') << timeinfo.tm_mday << '-'
468
      << setw(2) << setfill('0') << timeinfo.tm_hour << ':'
469
      << setw(2) << setfill('0') << timeinfo.tm_min;
470
 
471
  msg << " wrote <" << currentSize << "> bytes in <" << eventsWritten
472
      << "> events to file <" << currentFilename << ">" << endl;
473
 
474
  stats_file->write(msg.str());
475
  stats_file->close();
476
}
477
 
478
// Returns the number of bytes to pad to align to the specified chunk size
479
unsigned long FileStoreBase::bytesToPad(unsigned long next_message_length,
480
                                        unsigned long current_file_size,
481
                                        unsigned long chunk_size) {
482
 
483
  if (chunk_size > 0) {
484
    unsigned long space_left_in_chunk = chunk_size - current_file_size % chunk_size;
485
    if (next_message_length > space_left_in_chunk) {
486
      return space_left_in_chunk;
487
    } else {
488
      return 0;
489
    }
490
  }
491
  // chunk_size <= 0 means don't do any chunking
492
  return 0;
493
}
494
 
495
// set subDirectory to the name of this machine
496
void FileStoreBase::setHostNameSubDir() {
497
  if (!subDirectory.empty()) {
498
    string error_msg = "WARNING: Bad config - ";
499
    error_msg += "use_hostname_sub_directory will override sub_directory path";
500
    LOG_OPER("[%s] %s", categoryHandled.c_str(), error_msg.c_str());
501
  }
502
 
503
  char hostname[255];
504
  int error = gethostname(hostname, sizeof(hostname));
505
  if (error) {
506
    LOG_OPER("[%s] WARNING: gethostname returned error: %d ",
507
             categoryHandled.c_str(), error);
508
  }
509
 
510
  string hoststring(hostname);
511
 
512
  if (hoststring.empty()) {
513
    LOG_OPER("[%s] WARNING: could not get host name",
514
             categoryHandled.c_str());
515
  } else {
516
    subDirectory = hoststring;
517
  }
518
}
519
 
520
FileStore::FileStore(const string& category, bool multi_category,
521
                     bool is_buffer_file)
522
  : FileStoreBase(category, "file", multi_category),
523
    isBufferFile(is_buffer_file),
524
    addNewlines(false) {
525
}
526
 
527
FileStore::~FileStore() {
528
}
529
 
530
void FileStore::configure(pStoreConf configuration) {
531
  FileStoreBase::configure(configuration);
532
 
533
  // We can run using defaults for all of these, but there are
534
  // a couple of suspicious things we warn about.
535
  if (isBufferFile) {
536
    // scheduled file rotations of buffer files lead to too many messy cases
537
    rollPeriod = ROLL_NEVER;
538
 
539
    // Chunks don't work with the buffer file. There's no good reason
540
    // for this, it's just that the FileStore handles chunk padding and
541
    // the FileInterface handles framing, and you need to look at both to
542
    // read a file that's both chunked and framed. The buffer file has
543
    // to be framed, so we don't allow it to be chunked.
544
    // (framed means we write a message size to disk before the message
545
    //  data, which allows us to identify separate messages in binary data.
546
    //  Chunked means we pad with zeroes to ensure that every multiple
547
    //  of n bytes is the start of a message, which helps in recovering
548
    //  corrupted binary data and seeking into large files)
549
    chunkSize = 0;
550
 
551
    // Combine all categories in a single file for buffers
552
    if (multiCategory) {
553
      writeCategory = true;
554
    }
555
  }
556
 
557
  unsigned long inttemp = 0;
558
  configuration->getUnsigned("add_newlines", inttemp);
559
  addNewlines = inttemp ? true : false;
560
}
561
 
562
bool FileStore::openInternal(bool incrementFilename, struct tm* current_time) {
563
  bool success = false;
564
  struct tm timeinfo;
565
 
566
  if (!current_time) {
567
    time_t rawtime = time(NULL);
568
    localtime_r(&rawtime, &timeinfo);
569
    current_time = &timeinfo;
570
  }
571
 
572
  try {
573
    int suffix = findNewestFile(makeBaseFilename(current_time));
574
 
575
    if (incrementFilename) {
576
      ++suffix;
577
    }
578
 
579
    // this is the case where there's no file there and we're not incrementing
580
    if (suffix < 0) {
581
      suffix = 0;
582
    }
583
 
584
    string file = makeFullFilename(suffix, current_time);
585
 
586
    switch (rollPeriod) {
587
      case ROLL_DAILY:
588
        lastRollTime = current_time->tm_mday;
589
        break;
590
      case ROLL_HOURLY:
591
        lastRollTime = current_time->tm_hour;
592
        break;
593
      case ROLL_OTHER:
594
        lastRollTime = time(NULL);
595
        break;
596
      case ROLL_NEVER:
597
        break;
598
    }
599
 
600
    if (writeFile) {
601
      if (writeMeta) {
602
        writeFile->write(meta_logfile_prefix + file);
603
      }
604
      writeFile->close();
605
    }
606
 
607
    writeFile = FileInterface::createFileInterface(fsType, file, isBufferFile);
608
    if (!writeFile) {
609
      LOG_OPER("[%s] Failed to create file <%s> of type <%s> for writing",
610
               categoryHandled.c_str(), file.c_str(), fsType.c_str());
611
      setStatus("file open error");
612
      return false;
613
    }
614
 
615
    success = writeFile->createDirectory(baseFilePath);
616
 
617
    // If we created a subdirectory, we need to create two directories
618
    if (success && !subDirectory.empty()) {
619
      success = writeFile->createDirectory(filePath);
620
    }
621
 
622
    if (!success) {
623
      LOG_OPER("[%s] Failed to create directory for file <%s>",
624
               categoryHandled.c_str(), file.c_str());
625
      setStatus("File open error");
626
      return false;
627
    }
628
 
629
    success = writeFile->openWrite();
630
 
631
 
632
    if (!success) {
633
      LOG_OPER("[%s] Failed to open file <%s> for writing",
634
              categoryHandled.c_str(),
635
              file.c_str());
636
      setStatus("File open error");
637
    } else {
638
 
639
      /* just make a best effort here, and don't error if it fails */
640
      if (createSymlink && !isBufferFile) {
641
        string symlinkName = makeFullSymlink();
642
        boost::shared_ptr<FileInterface> tmp =
643
          FileInterface::createFileInterface(fsType, symlinkName, isBufferFile);
644
        tmp->deleteFile();
645
        string symtarget = makeFullFilename(suffix, current_time, false);
646
        writeFile->createSymlink(symtarget, symlinkName);
647
      }
648
      // else it confuses the filename code on reads
649
 
650
      LOG_OPER("[%s] Opened file <%s> for writing", categoryHandled.c_str(),
651
              file.c_str());
652
 
653
      currentSize = writeFile->fileSize();
654
      currentFilename = file;
655
      eventsWritten = 0;
656
      setStatus("");
657
    }
658
 
659
  } catch(std::exception const& e) {
660
    LOG_OPER("[%s] Failed to create/open file of type <%s> for writing",
661
             categoryHandled.c_str(), fsType.c_str());
662
    LOG_OPER("Exception: %s", e.what());
663
    setStatus("file create/open error");
664
 
665
    return false;
666
  }
667
  return success;
668
}
669
 
670
bool FileStore::isOpen() {
671
  return writeFile && writeFile->isOpen();
672
}
673
 
674
void FileStore::close() {
675
  if (writeFile) {
676
    writeFile->close();
677
  }
678
}
679
 
680
void FileStore::flush() {
681
  if (writeFile) {
682
    writeFile->flush();
683
  }
684
}
685
 
686
shared_ptr<Store> FileStore::copy(const std::string &category) {
687
  FileStore *store = new FileStore(category, multiCategory, isBufferFile);
688
  shared_ptr<Store> copied = shared_ptr<Store>(store);
689
 
690
  store->addNewlines = addNewlines;
691
  store->copyCommon(this);
692
  return copied;
693
}
694
 
695
bool FileStore::handleMessages(boost::shared_ptr<logentry_vector_t> messages) {
696
 
697
  if (!isOpen()) {
698
    LOG_OPER("[%s] File failed to open FileStore::handleMessages()", categoryHandled.c_str());
699
    return false;
700
  }
701
 
702
  // write messages to current file
703
  return writeMessages(messages);
704
}
705
 
706
// writes messages to either the specified file or the the current writeFile
707
bool FileStore::writeMessages(boost::shared_ptr<logentry_vector_t> messages,
708
                              boost::shared_ptr<FileInterface> file) {
709
  // Data is written to a buffer first, then sent to disk in one call to write.
710
  // This costs an extra copy of the data, but dramatically improves latency with
711
  // network based files. (nfs, etc)
712
  string        write_buffer;
713
  bool          success = true;
714
  unsigned long current_size_buffered = 0; // size of data in write_buffer
715
  unsigned long num_buffered = 0;
716
  unsigned long num_written = 0;
717
  boost::shared_ptr<FileInterface> write_file;
718
  unsigned long max_write_size = min(maxSize, maxWriteSize);
719
 
720
  // if no file given, use current writeFile
721
  if (file) {
722
    write_file = file;
723
  } else {
724
    write_file = writeFile;
725
  }
726
 
727
  try {
728
    for (logentry_vector_t::iterator iter = messages->begin();
729
         iter != messages->end();
730
         ++iter) {
731
 
732
      // have to be careful with the length here. getFrame wants the length without
733
      // the frame, then bytesToPad wants the length of the frame and the message.
734
      unsigned long length = 0;
735
      unsigned long message_length = (*iter)->message.length();
736
      string frame, category_frame;
737
 
738
      if (addNewlines) {
739
        ++message_length;
740
      }
741
 
742
      length += message_length;
743
 
744
      if (writeCategory) {
745
        //add space for category+newline and category frame
746
        unsigned long category_length = (*iter)->category.length() + 1;
747
        length += category_length;
748
 
749
        category_frame = write_file->getFrame(category_length);
750
        length += category_frame.length();
751
      }
752
 
753
      // frame is a header that the underlying file class can add to each message
754
      frame = write_file->getFrame(message_length);
755
 
756
      length += frame.length();
757
 
758
      // padding to align messages on chunk boundaries
759
      unsigned long padding = bytesToPad(length, current_size_buffered, chunkSize);
760
 
761
      length += padding;
762
 
763
      if (padding) {
764
        write_buffer += string(padding, 0);
765
      }
766
 
767
      if (writeCategory) {
768
        write_buffer += category_frame;
769
        write_buffer += (*iter)->category + "\n";
770
      }
771
 
772
      write_buffer += frame;
773
      write_buffer += (*iter)->message;
774
 
775
      if (addNewlines) {
776
        write_buffer += "\n";
777
      }
778
 
779
      current_size_buffered += length;
780
      num_buffered++;
781
 
782
      // Write buffer if processing last message or if larger than allowed
783
      if ((currentSize + current_size_buffered > max_write_size && maxSize != 0) ||
784
          messages->end() == iter + 1 ) {
785
        if (!write_file->write(write_buffer)) {
786
          LOG_OPER("[%s] File store failed to write (%lu) messages to file",
787
                   categoryHandled.c_str(), messages->size());
788
          setStatus("File write error");
789
          success = false;
790
          break;
791
        }
792
 
793
        num_written += num_buffered;
794
        currentSize += current_size_buffered;
795
        num_buffered = 0;
796
        current_size_buffered = 0;
797
        write_buffer = "";
798
      }
799
 
800
      // rotate file if large enough and not writing to a separate file
801
      if ((currentSize > maxSize && maxSize != 0 )&& !file) {
802
        rotateFile();
803
        write_file = writeFile;
804
      }
805
    }
806
  } catch (std::exception const& e) {
807
    LOG_OPER("[%s] File store failed to write. Exception: %s",
808
             categoryHandled.c_str(), e.what());
809
    success = false;
810
  }
811
 
812
  eventsWritten += num_written;
813
 
814
  if (!success) {
815
    close();
816
 
817
    // update messages to include only the messages that were not handled
818
    if (num_written > 0) {
819
      messages->erase(messages->begin(), messages->begin() + num_written);
820
    }
821
  }
822
 
823
  return success;
824
}
825
 
826
void FileStore::deleteOldest(struct tm* now) {
827
 
828
  int index = findOldestFile(makeBaseFilename(now));
829
  if (index < 0) {
830
    return;
831
  }
832
  shared_ptr<FileInterface> deletefile = FileInterface::createFileInterface(fsType,
833
                                            makeFullFilename(index, now));
834
  deletefile->deleteFile();
835
}
836
 
837
// Replace the messages in the oldest file at this timestamp with the input messages
838
bool FileStore::replaceOldest(boost::shared_ptr<logentry_vector_t> messages,
839
                              struct tm* now) {
840
  string base_name = makeBaseFilename(now);
841
  int index = findOldestFile(base_name);
842
  if (index < 0) {
843
    LOG_OPER("[%s] Could not find files <%s>", categoryHandled.c_str(), base_name.c_str());
844
    return false;
845
  }
846
 
847
  string filename = makeFullFilename(index, now);
848
 
849
  // Need to close and reopen store in case we already have this file open
850
  close();
851
 
852
  shared_ptr<FileInterface> infile = FileInterface::createFileInterface(fsType,
853
                                          filename, isBufferFile);
854
 
855
  // overwrite the old contents of the file
856
  bool success;
857
  if (infile->openTruncate()) {
858
    success = writeMessages(messages, infile);
859
 
860
  } else {
861
    LOG_OPER("[%s] Failed to open file <%s> for writing and truncate",
862
             categoryHandled.c_str(), filename.c_str());
863
    success = false;
864
  }
865
 
866
  // close this file and re-open store
867
  infile->close();
868
  open();
869
 
870
  return success;
871
}
872
 
873
bool FileStore::readOldest(/*out*/ boost::shared_ptr<logentry_vector_t> messages,
874
                           struct tm* now) {
875
 
876
  int index = findOldestFile(makeBaseFilename(now));
877
  if (index < 0) {
878
    // This isn't an error. It's legit to call readOldest when there aren't any
879
    // files left, in which case the call succeeds but returns messages empty.
880
    return true;
881
  }
882
  std::string filename = makeFullFilename(index, now);
883
 
884
  shared_ptr<FileInterface> infile = FileInterface::createFileInterface(fsType, filename, isBufferFile);
885
 
886
  if (!infile->openRead()) {
887
    LOG_OPER("[%s] Failed to open file <%s> for reading", categoryHandled.c_str(), filename.c_str());
888
    return false;
889
  }
890
 
891
  std::string message;
892
  while (infile->readNext(message)) {
893
    if (!message.empty()) {
894
      logentry_ptr_t entry = logentry_ptr_t(new LogEntry);
895
 
896
      // check whether a category is stored with the message
897
      if (writeCategory) {
898
        // get category without trailing \n
899
        entry->category = message.substr(0, message.length() - 1);
900
 
901
        if (!infile->readNext(message)) {
902
          LOG_OPER("[%s] category not stored with message <%s>",
903
                   categoryHandled.c_str(), entry->category.c_str());
904
        }
905
      } else {
906
        entry->category = categoryHandled;
907
      }
908
 
909
      entry->message = message;
910
 
911
      messages->push_back(entry);
912
    }
913
  }
914
  infile->close();
915
 
916
  LOG_OPER("[%s] successfully read <%lu> entries from file <%s>",
917
        categoryHandled.c_str(), messages->size(), filename.c_str());
918
  return true;
919
}
920
 
921
bool FileStore::empty(struct tm* now) {
922
 
923
  std::vector<std::string> files = FileInterface::list(filePath, fsType);
924
 
925
  std::string base_filename = makeBaseFilename(now);
926
  for (std::vector<std::string>::iterator iter = files.begin();
927
       iter != files.end();
928
       ++iter) {
929
    int suffix =  getFileSuffix(*iter, base_filename);
930
    if (-1 != suffix) {
931
      std::string fullname = makeFullFilename(suffix, now);
932
      shared_ptr<FileInterface> file = FileInterface::createFileInterface(fsType, fullname);
933
      if (file->fileSize()) {
934
        return false;
935
      }
936
    } // else it doesn't match the filename for this store
937
  }
938
  return true;
939
}
940
 
941
 
942
ThriftFileStore::ThriftFileStore(const std::string& category, bool multi_category)
943
  : FileStoreBase(category, "thriftfile", multi_category),
944
    flushFrequencyMs(0),
945
    msgBufferSize(0),
946
    useSimpleFile(0) {
947
}
948
 
949
ThriftFileStore::~ThriftFileStore() {
950
}
951
 
952
shared_ptr<Store> ThriftFileStore::copy(const std::string &category) {
953
  ThriftFileStore *store = new ThriftFileStore(category, multiCategory);
954
  shared_ptr<Store> copied = shared_ptr<Store>(store);
955
 
956
  store->flushFrequencyMs = flushFrequencyMs;
957
  store->msgBufferSize = msgBufferSize;
958
  store->copyCommon(this);
959
  return copied;
960
}
961
 
962
bool ThriftFileStore::handleMessages(boost::shared_ptr<logentry_vector_t> messages) {
963
  if (!isOpen()) {
964
    return false;
965
  }
966
 
967
  unsigned long messages_handled = 0;
968
  for (logentry_vector_t::iterator iter = messages->begin();
969
       iter != messages->end();
970
       ++iter) {
971
 
972
    // This length is an estimate -- what the ThriftLogFile actually writes is a black box to us
973
    uint32_t length = (*iter)->message.size();
974
 
975
    try {
976
      thriftFileTransport->write(reinterpret_cast<const uint8_t*>((*iter)->message.data()), length);
977
      currentSize += length;
978
      ++eventsWritten;
979
      ++messages_handled;
980
    } catch (TException te) {
981
      LOG_OPER("[%s] Thrift file store failed to write to file: %s\n", categoryHandled.c_str(), te.what());
982
      setStatus("File write error");
983
 
984
      // If we already handled some messages, remove them from vector before
985
      // returning failure
986
      if (messages_handled) {
987
        messages->erase(messages->begin(), iter);
988
      }
989
      return false;
990
    }
991
  }
992
  // We can't wait until periodicCheck because we could be getting
993
  // a lot of data all at once in a failover situation
994
  if (currentSize > maxSize && maxSize != 0) {
995
    rotateFile();
996
  }
997
 
998
  return true;
999
}
1000
 
1001
bool ThriftFileStore::open() {
1002
  return openInternal(true, NULL);
1003
}
1004
 
1005
bool ThriftFileStore::isOpen() {
1006
  return thriftFileTransport && thriftFileTransport->isOpen();
1007
}
1008
 
1009
void ThriftFileStore::configure(pStoreConf configuration) {
1010
  FileStoreBase::configure(configuration);
1011
  configuration->getUnsigned("flush_frequency_ms", flushFrequencyMs);
1012
  configuration->getUnsigned("msg_buffer_size", msgBufferSize);
1013
  configuration->getUnsigned("use_simple_file", useSimpleFile);
1014
}
1015
 
1016
void ThriftFileStore::close() {
1017
  thriftFileTransport.reset();
1018
}
1019
 
1020
void ThriftFileStore::flush() {
1021
  // TFileTransport has its own periodic flushing mechanism, and we
1022
  // introduce deadlocks if we try to call it from more than one place
1023
  return;
1024
}
1025
 
1026
bool ThriftFileStore::openInternal(bool incrementFilename, struct tm* current_time) {
1027
  struct tm timeinfo;
1028
 
1029
  if (!current_time) {
1030
    time_t rawtime = time(NULL);
1031
    localtime_r(&rawtime, &timeinfo);
1032
    current_time = &timeinfo;
1033
  }
1034
 
1035
  int suffix = findNewestFile(makeBaseFilename(current_time));
1036
 
1037
  if (incrementFilename) {
1038
    ++suffix;
1039
  }
1040
 
1041
  // this is the case where there's no file there and we're not incrementing
1042
  if (suffix < 0) {
1043
    suffix = 0;
1044
  }
1045
 
1046
  string filename = makeFullFilename(suffix, current_time);
1047
  /* try to create the directory containing the file */
1048
  if (!createFileDirectory()) {
1049
    LOG_OPER("[%s] Could not create path for file: %s",
1050
             categoryHandled.c_str(), filename.c_str());
1051
    return false;
1052
  }
1053
 
1054
  switch (rollPeriod) {
1055
    case ROLL_DAILY:
1056
      lastRollTime = current_time->tm_mday;
1057
      break;
1058
    case ROLL_HOURLY:
1059
      lastRollTime = current_time->tm_hour;
1060
      break;
1061
    case ROLL_OTHER:
1062
      lastRollTime = time(NULL);
1063
      break;
1064
    case ROLL_NEVER:
1065
      break;
1066
  }
1067
 
1068
 
1069
  try {
1070
    if (useSimpleFile) {
1071
      thriftFileTransport.reset(new TSimpleFileTransport(filename, false, true));
1072
    } else {
1073
      TFileTransport *transport = new TFileTransport(filename);
1074
      thriftFileTransport.reset(transport);
1075
 
1076
      if (chunkSize != 0) {
1077
	transport->setChunkSize(chunkSize);
1078
      }
1079
      if (flushFrequencyMs > 0) {
1080
	transport->setFlushMaxUs(flushFrequencyMs * 1000);
1081
      }
1082
      if (msgBufferSize > 0) {
1083
	transport->setEventBufferSize(msgBufferSize);
1084
      }
1085
    }
1086
 
1087
    LOG_OPER("[%s] Opened file <%s> for writing", categoryHandled.c_str(), filename.c_str());
1088
 
1089
    struct stat st;
1090
    if (stat(filename.c_str(), &st) == 0) {
1091
      currentSize = st.st_size;
1092
    } else {
1093
      currentSize = 0;
1094
    }
1095
    currentFilename = filename;
1096
    eventsWritten = 0;
1097
    setStatus("");
1098
  } catch (TException te) {
1099
    LOG_OPER("[%s] Failed to open file <%s> for writing: %s\n", categoryHandled.c_str(), filename.c_str(), te.what());
1100
    setStatus("File open error");
1101
    return false;
1102
  }
1103
 
1104
  /* just make a best effort here, and don't error if it fails */
1105
  if (createSymlink) {
1106
    string symlinkName = makeFullSymlink();
1107
    unlink(symlinkName.c_str());
1108
    string symtarget = makeFullFilename(suffix, current_time, false);
1109
    symlink(symtarget.c_str(), symlinkName.c_str());
1110
  }
1111
 
1112
  return true;
1113
}
1114
 
1115
bool ThriftFileStore::createFileDirectory () {
1116
  try {
1117
    boost::filesystem::create_directories(filePath);
1118
  }catch(std::exception const& e) {
1119
    LOG_OPER("Exception < %s > in ThriftFileStore::createFileDirectory for path %s",
1120
      e.what(),filePath.c_str());
1121
    return false;
1122
  }
1123
  return true;
1124
}
1125
 
1126
BufferStore::BufferStore(const string& category, bool multi_category)
1127
  : Store(category, "buffer", multi_category),
1128
    maxQueueLength(DEFAULT_BUFFERSTORE_MAX_QUEUE_LENGTH),
1129
    bufferSendRate(DEFAULT_BUFFERSTORE_SEND_RATE),
1130
    avgRetryInterval(DEFAULT_BUFFERSTORE_AVG_RETRY_INTERVAL),
1131
    retryIntervalRange(DEFAULT_BUFFERSTORE_RETRY_INTERVAL_RANGE),
1132
    replayBuffer(true),
1133
    state(DISCONNECTED) {
1134
 
1135
  lastWriteTime = lastOpenAttempt = time(NULL);
1136
  retryInterval = getNewRetryInterval();
1137
 
1138
  // we can't open the client conection until we get configured
1139
}
1140
 
1141
BufferStore::~BufferStore() {
1142
 
1143
}
1144
 
1145
void BufferStore::configure(pStoreConf configuration) {
1146
 
1147
  // Constructor defaults are fine if these don't exist
1148
  configuration->getUnsigned("max_queue_length", (unsigned long&) maxQueueLength);
1149
  configuration->getUnsigned("buffer_send_rate", (unsigned long&) bufferSendRate);
1150
  configuration->getUnsigned("retry_interval", (unsigned long&) avgRetryInterval);
1151
  configuration->getUnsigned("retry_interval_range", (unsigned long&) retryIntervalRange);
1152
 
1153
  string tmp;
1154
  if (configuration->getString("replay_buffer", tmp) && tmp != "yes") {
1155
    replayBuffer = false;
1156
  }
1157
 
1158
  if (retryIntervalRange > avgRetryInterval) {
1159
    LOG_OPER("[%s] Bad config - retry_interval_range must be less than retry_interval. Using <%d> as range instead of <%d>",
1160
             categoryHandled.c_str(), (int)avgRetryInterval, (int)retryIntervalRange);
1161
    retryIntervalRange = avgRetryInterval;
1162
  }
1163
 
1164
  pStoreConf secondary_store_conf;
1165
  if (!configuration->getStore("secondary", secondary_store_conf)) {
1166
    string msg("Bad config - buffer store doesn't have secondary store");
1167
    setStatus(msg);
1168
    cout << msg << endl;
1169
  } else {
1170
    string type;
1171
    if (!secondary_store_conf->getString("type", type)) {
1172
      string msg("Bad config - buffer secondary store doesn't have a type");
1173
      setStatus(msg);
1174
      cout << msg << endl;
1175
    } else {
1176
      // If replayBuffer is true, then we need to create a readable store
1177
      secondaryStore = createStore(type, categoryHandled, replayBuffer,
1178
                                   multiCategory);
1179
      secondaryStore->configure(secondary_store_conf);
1180
    }
1181
  }
1182
 
1183
  pStoreConf primary_store_conf;
1184
  if (!configuration->getStore("primary", primary_store_conf)) {
1185
    string msg("Bad config - buffer store doesn't have primary store");
1186
    setStatus(msg);
1187
    cout << msg << endl;
1188
  } else {
1189
    string type;
1190
    if (!primary_store_conf->getString("type", type)) {
1191
      string msg("Bad config - buffer primary store doesn't have a type");
1192
      setStatus(msg);
1193
      cout << msg << endl;
1194
    } else if (0 == type.compare("multi")) {
1195
      // Cannot allow multistores in bufferstores as they can partially fail to
1196
      // handle a message.  We cannot retry sending a messages that was
1197
      // already handled by a subset of stores in the multistore.
1198
      string msg("Bad config - buffer primary store cannot be multistore");
1199
      setStatus(msg);
1200
    } else {
1201
      primaryStore = createStore(type, categoryHandled, false, multiCategory);
1202
      primaryStore->configure(primary_store_conf);
1203
    }
1204
  }
1205
 
1206
  // If the config is bad we'll still try to write the data to a
1207
  // default location on local disk.
1208
  if (!secondaryStore) {
1209
    secondaryStore = createStore("file", categoryHandled, true, multiCategory);
1210
  }
1211
  if (!primaryStore) {
1212
    primaryStore = createStore("file", categoryHandled, false, multiCategory);
1213
  }
1214
}
1215
 
1216
bool BufferStore::isOpen() {
1217
  return primaryStore->isOpen() || secondaryStore->isOpen();
1218
}
1219
 
1220
bool BufferStore::open() {
1221
 
1222
  // try to open the primary store, and set the state accordingly
1223
  if (primaryStore->open()) {
1224
    // in case there are files left over from a previous instance
1225
    changeState(SENDING_BUFFER);
1226
 
1227
    // If we don't need to send buffers, skip to streaming
1228
    if (!replayBuffer) {
1229
      // We still switch state to SENDING_BUFFER first just to make sure we
1230
      // can open the secondary store
1231
      changeState(STREAMING);
1232
    }
1233
  } else {
1234
    secondaryStore->open();
1235
    changeState(DISCONNECTED);
1236
  }
1237
 
1238
  return isOpen();
1239
}
1240
 
1241
void BufferStore::close() {
1242
  if (primaryStore->isOpen()) {
1243
    primaryStore->flush();
1244
    primaryStore->close();
1245
  }
1246
  if (secondaryStore->isOpen()) {
1247
    secondaryStore->flush();
1248
    secondaryStore->close();
1249
  }
1250
}
1251
 
1252
void BufferStore::flush() {
1253
  if (primaryStore->isOpen()) {
1254
    primaryStore->flush();
1255
  }
1256
  if (secondaryStore->isOpen()) {
1257
    secondaryStore->flush();
1258
  }
1259
}
1260
 
1261
shared_ptr<Store> BufferStore::copy(const std::string &category) {
1262
  BufferStore *store = new BufferStore(category, multiCategory);
1263
  shared_ptr<Store> copied = shared_ptr<Store>(store);
1264
 
1265
  store->maxQueueLength = maxQueueLength;
1266
  store->bufferSendRate = bufferSendRate;
1267
  store->avgRetryInterval = avgRetryInterval;
1268
  store->retryIntervalRange = retryIntervalRange;
1269
  store->replayBuffer = replayBuffer;
1270
 
1271
  store->primaryStore = primaryStore->copy(category);
1272
  store->secondaryStore = secondaryStore->copy(category);
1273
  return copied;
1274
}
1275
 
1276
bool BufferStore::handleMessages(boost::shared_ptr<logentry_vector_t> messages) {
1277
  lastWriteTime = time(NULL);
1278
 
1279
  // If the queue is really long it's probably because the primary store isn't moving
1280
  // fast enough and is backing up, in which case it's best to give up on it for now.
1281
  if (state == STREAMING && messages->size() > maxQueueLength) {
1282
    LOG_OPER("[%s] BufferStore queue backing up, switching to secondary store (%u messages)", categoryHandled.c_str(), (unsigned)messages->size());
1283
    changeState(DISCONNECTED);
1284
  }
1285
 
1286
  if (state == STREAMING) {
1287
    if (primaryStore->handleMessages(messages)) {
1288
      return true;
1289
    } else {
1290
      changeState(DISCONNECTED);
1291
    }
1292
  }
1293
 
1294
  if (state != STREAMING) {
1295
    // If this fails there's nothing else we can do here.
1296
    return secondaryStore->handleMessages(messages);
1297
  }
1298
 
1299
  return false;
1300
}
1301
 
1302
// handles entry and exit conditions for states
1303
void BufferStore::changeState(buffer_state_t new_state) {
1304
 
1305
  // leaving this state
1306
  switch (state) {
1307
  case STREAMING:
1308
    secondaryStore->open();
1309
    break;
1310
  case DISCONNECTED:
1311
    // Assume that if we are now able to leave the disconnected state, any
1312
    // former warning has now been fixed.
1313
    setStatus("");
1314
    break;
1315
  case SENDING_BUFFER:
1316
    break;
1317
  default:
1318
    break;
1319
  }
1320
 
1321
  // entering this state
1322
  switch (new_state) {
1323
  case STREAMING:
1324
    if (secondaryStore->isOpen()) {
1325
      secondaryStore->close();
1326
    }
1327
    break;
1328
  case DISCONNECTED:
1329
    // Do not set status here as it is possible to be in this frequently.
1330
    // Whatever caused us to enter this state should have either set status
1331
    // or chosen not to set status.
1332
    g_Handler->incrementCounter("retries");
1333
    lastOpenAttempt = time(NULL);
1334
    retryInterval = getNewRetryInterval();
1335
    LOG_OPER("[%s] choosing new retry interval <%d> seconds", categoryHandled.c_str(),
1336
             (int)retryInterval);
1337
    if (!secondaryStore->isOpen()) {
1338
      secondaryStore->open();
1339
    }
1340
    break;
1341
  case SENDING_BUFFER:
1342
    if (!secondaryStore->isOpen()) {
1343
      secondaryStore->open();
1344
    }
1345
    break;
1346
  default:
1347
    break;
1348
  }
1349
 
1350
  LOG_OPER("[%s] Changing state from <%s> to <%s>", categoryHandled.c_str(), stateAsString(state), stateAsString(new_state));
1351
  state = new_state;
1352
}
1353
 
1354
void BufferStore::periodicCheck() {
1355
 
1356
  // This class is responsible for checking its children
1357
  primaryStore->periodicCheck();
1358
  secondaryStore->periodicCheck();
1359
 
1360
  time_t now = time(NULL);
1361
  struct tm nowinfo;
1362
  localtime_r(&now, &nowinfo);
1363
 
1364
  if (state == DISCONNECTED) {
1365
 
1366
    if (now - lastOpenAttempt > retryInterval) {
1367
 
1368
      if (primaryStore->open()) {
1369
        // Success.  Check if we need to send buffers from secondary to primary
1370
        if (replayBuffer) {
1371
          changeState(SENDING_BUFFER);
1372
        } else {
1373
          changeState(STREAMING);
1374
        }
1375
      } else {
1376
        // this resets the retry timer
1377
        changeState(DISCONNECTED);
1378
      }
1379
    }
1380
  }
1381
 
1382
  if (state == SENDING_BUFFER) {
1383
 
1384
    // Read a group of messages from the secondary store and send them to
1385
    // the primary store. Note that the primary store could tell us to try
1386
    // again later, so this isn't very efficient if it reads too many
1387
    // messages at once. (if the secondary store is a file, the number of
1388
    // messages read is controlled by the max file size)
1389
    unsigned sent = 0;
1390
    for (sent = 0; sent < bufferSendRate; ++sent) {
1391
      boost::shared_ptr<logentry_vector_t> messages(new logentry_vector_t);
1392
      if (secondaryStore->readOldest(messages, &nowinfo)) {
1393
        lastWriteTime = time(NULL);
1394
 
1395
        unsigned long size = messages->size();
1396
        if (size) {
1397
          if (primaryStore->handleMessages(messages)) {
1398
            secondaryStore->deleteOldest(&nowinfo);
1399
          } else {
1400
 
1401
            if (messages->size() != size) {
1402
              // We were only able to process some, but not all of this batch
1403
              // of messages.  Replace this batch of messages with just the messages
1404
              // that were not processed.
1405
              LOG_OPER("[%s] buffer store primary store processed %lu/%lu messages",
1406
                       categoryHandled.c_str(), size - messages->size(), size);
1407
 
1408
              // Put back un-handled messages
1409
              if (!secondaryStore->replaceOldest(messages, &nowinfo)) {
1410
                // Nothing we can do but try to remove oldest messages and report a loss
1411
                LOG_OPER("[%s] buffer store secondary store lost %lu messages",
1412
                         categoryHandled.c_str(), messages->size());
1413
                g_Handler->incrementCounter("lost", messages->size());
1414
                secondaryStore->deleteOldest(&nowinfo);
1415
              }
1416
            }
1417
 
1418
            changeState(DISCONNECTED);
1419
            break;
1420
          }
1421
        }  else {
1422
          // else it's valid for read to not find anything but not error
1423
          secondaryStore->deleteOldest(&nowinfo);
1424
        }
1425
      } else {
1426
        // This is bad news. We'll stay in the sending state and keep trying to read.
1427
        setStatus("Failed to read from secondary store");
1428
        LOG_OPER("[%s] WARNING: buffer store can't read from secondary store", categoryHandled.c_str());
1429
        break;
1430
      }
1431
 
1432
      if (secondaryStore->empty(&nowinfo)) {
1433
        LOG_OPER("[%s] No more buffer files to send, switching to streaming mode", categoryHandled.c_str());
1434
        changeState(STREAMING);
1435
 
1436
        primaryStore->flush();
1437
        break;
1438
      }
1439
    }
1440
  }// if state == SENDING_BUFFER
1441
}
1442
 
1443
 
1444
time_t BufferStore::getNewRetryInterval() {
1445
  time_t interval = avgRetryInterval - retryIntervalRange/2 + rand() % retryIntervalRange;
1446
  return interval;
1447
}
1448
 
1449
const char* BufferStore::stateAsString(buffer_state_t state) {
1450
switch (state) {
1451
  case STREAMING:
1452
    return "STREAMING";
1453
  case DISCONNECTED:
1454
    return "DISCONNECTED";
1455
  case SENDING_BUFFER:
1456
    return "SENDING_BUFFER";
1457
  default:
1458
    return "unknown state";
1459
  }
1460
}
1461
 
1462
std::string BufferStore::getStatus() {
1463
 
1464
  // This order is intended to give precedence to the errors
1465
  // that are likely to be the worst. We can handle a problem
1466
  // with the primary store, but not the secondary.
1467
  std::string return_status = secondaryStore->getStatus();
1468
  if (return_status.empty()) {
1469
    return_status = Store::getStatus();
1470
  }
1471
  if (return_status.empty()) {
1472
    return_status = primaryStore->getStatus();
1473
  }
1474
  return return_status;
1475
}
1476
 
1477
 
1478
NetworkStore::NetworkStore(const string& category, bool multi_category)
1479
  : Store(category, "network", multi_category),
1480
    useConnPool(false),
1481
    smcBased(false),
1482
    remotePort(0),
1483
    serviceCacheTimeout(DEFAULT_NETWORKSTORE_CACHE_TIMEOUT),
1484
    lastServiceCheck(0),
1485
    opened(false) {
1486
  // we can't open the connection until we get configured
1487
 
1488
  // the bool for opened ensures that we don't make duplicate
1489
  // close calls, which would screw up the connection pool's
1490
  // reference counting.
1491
}
1492
 
1493
NetworkStore::~NetworkStore() {
1494
  close();
1495
}
1496
 
1497
void NetworkStore::configure(pStoreConf configuration) {
1498
  // Error checking is done on open()
1499
  // smc takes precedence over host + port
1500
  if (configuration->getString("smc_service", smcService)) {
1501
    smcBased = true;
1502
 
1503
    // Constructor defaults are fine if these don't exist
1504
    configuration->getString("service_options", serviceOptions);
1505
    configuration->getUnsigned("service_cache_timeout", serviceCacheTimeout);
1506
  } else {
1507
    smcBased = false;
1508
    configuration->getString("remote_host", remoteHost);
1509
    configuration->getUnsigned("remote_port", remotePort);
1510
  }
1511
 
1512
  if (!configuration->getInt("timeout", timeout)) {
1513
    timeout = DEFAULT_SOCKET_TIMEOUT_MS;
1514
  }
1515
 
1516
  string temp;
1517
  if (configuration->getString("use_conn_pool", temp)) {
1518
    if (0 == temp.compare("yes")) {
1519
      useConnPool = true;
1520
    }
1521
  }
1522
}
1523
 
1524
bool NetworkStore::open() {
1525
 
1526
  if (smcBased) {
1527
    bool success = true;
1528
    time_t now = time(NULL);
1529
 
1530
    // Only get list of servers if we haven't already gotten them recently
1531
    if (lastServiceCheck <= (time_t) (now - serviceCacheTimeout)) {
1532
      lastServiceCheck = now;
1533
 
1534
      success =
1535
        network_config::getService(smcService, serviceOptions, servers);
1536
    }
1537
 
1538
    // Cannot open if we couldn't find any servers
1539
    if (!success || servers.empty()) {
1540
      LOG_OPER("[%s] Failed to get servers from smc", categoryHandled.c_str());
1541
      setStatus("Could not get list of servers from smc");
1542
      return false;
1543
    }
1544
 
1545
    if (useConnPool) {
1546
      opened = g_connPool.open(smcService, servers, static_cast<int>(timeout));
1547
    } else {
1548
      // only open unpooled connection if not already open
1549
      if (unpooledConn == NULL) {
1550
        unpooledConn = shared_ptr<scribeConn>(new scribeConn(smcService, servers, static_cast<int>(timeout)));
1551
        opened = unpooledConn->open();
1552
      } else {
1553
        opened = unpooledConn->isOpen();
1554
        if (!opened) {
1555
          opened = unpooledConn->open();
1556
        }
1557
      }
1558
    }
1559
 
1560
  } else if (remotePort <= 0 ||
1561
             remoteHost.empty()) {
1562
    LOG_OPER("[%s] Bad config - won't attempt to connect to <%s:%lu>", categoryHandled.c_str(), remoteHost.c_str(), remotePort);
1563
    setStatus("Bad config - invalid location for remote server");
1564
    return false;
1565
 
1566
  } else {
1567
    if (useConnPool) {
1568
      opened = g_connPool.open(remoteHost, remotePort, static_cast<int>(timeout));
1569
    } else {
1570
      // only open unpooled connection if not already open
1571
      if (unpooledConn == NULL) {
1572
        unpooledConn = shared_ptr<scribeConn>(new scribeConn(remoteHost, remotePort, static_cast<int>(timeout)));
1573
        opened = unpooledConn->open();
1574
      } else {
1575
        opened = unpooledConn->isOpen();
1576
        if (!opened) {
1577
          opened = unpooledConn->open();
1578
        }
1579
      }
1580
    }
1581
  }
1582
 
1583
 
1584
  if (opened) {
1585
    setStatus("");
1586
  } else {
1587
    setStatus("Failed to connect");
1588
  }
1589
  return opened;
1590
}
1591
 
1592
void NetworkStore::close() {
1593
  if (!opened) {
1594
    return;
1595
  }
1596
  opened = false;
1597
  if (useConnPool) {
1598
    if (smcBased) {
1599
      g_connPool.close(smcService);
1600
    } else {
1601
      g_connPool.close(remoteHost, remotePort);
1602
    }
1603
  } else {
1604
    if (unpooledConn != NULL) {
1605
      unpooledConn->close();
1606
    }
1607
  }
1608
}
1609
 
1610
bool NetworkStore::isOpen() {
1611
  return opened;
1612
}
1613
 
1614
shared_ptr<Store> NetworkStore::copy(const std::string &category) {
1615
  NetworkStore *store = new NetworkStore(category, multiCategory);
1616
  shared_ptr<Store> copied = shared_ptr<Store>(store);
1617
 
1618
  store->useConnPool = useConnPool;
1619
  store->smcBased = smcBased;
1620
  store->timeout = timeout;
1621
  store->remoteHost = remoteHost;
1622
  store->remotePort = remotePort;
1623
  store->smcService = smcService;
1624
 
1625
  return copied;
1626
}
1627
 
1628
bool NetworkStore::handleMessages(boost::shared_ptr<logentry_vector_t> messages) {
1629
  if (!isOpen()) {
1630
    LOG_OPER("[%s] Logic error: NetworkStore::handleMessages called on closed store", categoryHandled.c_str());
1631
    return false;
1632
  } else if (useConnPool) {
1633
    if (smcBased) {
1634
      return g_connPool.send(smcService, messages);
1635
    } else {
1636
      return g_connPool.send(remoteHost, remotePort, messages);
1637
    }
1638
  } else {
1639
    if (unpooledConn) {
1640
      return unpooledConn->send(messages);
1641
    } else {
1642
      LOG_OPER("[%s] Logic error: NetworkStore::handleMessages unpooledConn is NULL", categoryHandled.c_str());
1643
      return false;
1644
    }
1645
  }
1646
}
1647
 
1648
void NetworkStore::flush() {
1649
  // Nothing to do
1650
}
1651
 
1652
BucketStore::BucketStore(const string& category, bool multi_category)
1653
  : Store(category, "bucket", multi_category),
1654
    bucketType(context_log),
1655
    delimiter(DEFAULT_BUCKETSTORE_DELIMITER),
1656
    removeKey(false),
1657
    opened(false),
1658
    bucketRange(0),
1659
    numBuckets(1) {
1660
}
1661
 
1662
BucketStore::~BucketStore() {
1663
 
1664
}
1665
 
1666
// Given a single bucket definition, create multiple buckets
1667
void BucketStore::createBucketsFromBucket(pStoreConf configuration,
1668
					  pStoreConf bucket_conf) {
1669
  string error_msg, bucket_subdir, type, path, failure_bucket;
1670
  bool needs_bucket_subdir = false;
1671
  unsigned long bucket_offset = 0;
1672
  pStoreConf tmp;
1673
 
1674
  // check for extra bucket definitions
1675
  if (configuration->getStore("bucket0", tmp) ||
1676
      configuration->getStore("bucket1", tmp)) {
1677
    error_msg = "bucket store has too many buckets defined";
1678
    goto handle_error;
1679
  }
1680
 
1681
  bucket_conf->getString("type", type);
1682
  if (type != "file" && type != "thriftfile") {
1683
    error_msg = "store contained in a bucket store must have a type of ";
1684
    error_msg += "either file or thriftfile if not defined explicitely";
1685
    goto handle_error;
1686
  }
1687
 
1688
  needs_bucket_subdir = true;
1689
  if (!configuration->getString("bucket_subdir", bucket_subdir)) {
1690
    error_msg =
1691
      "bucketizer containing file stores must have a bucket_subdir";
1692
    goto handle_error;
1693
  }
1694
  if (!bucket_conf->getString("file_path", path)) {
1695
    error_msg =
1696
      "file store contained by bucketizer must have a file_path";
1697
    goto handle_error;
1698
  }
1699
 
1700
  // set starting bucket number if specified
1701
  configuration->getUnsigned("bucket_offset", bucket_offset);
1702
 
1703
  // check if failure bucket was given a different name
1704
  configuration->getString("failure_bucket", failure_bucket);
1705
 
1706
  // We actually create numBuckets + 1 stores. Messages are normally
1707
  // hashed into buckets 1 through numBuckets, and messages that can't
1708
  // be hashed are put in bucket 0.
1709
 
1710
  for (unsigned int i = 0; i <= numBuckets; ++i) {
1711
 
1712
    shared_ptr<Store> newstore =
1713
      createStore(type, categoryHandled, false, multiCategory);
1714
 
1715
    if (!newstore) {
1716
      error_msg = "can't create store of type: ";
1717
      error_msg += type;
1718
      goto handle_error;
1719
    }
1720
 
1721
    // For file/thrift file buckets, create unique filepath for each bucket
1722
    if (needs_bucket_subdir) {
1723
      if (i == 0 && !failure_bucket.empty()) {
1724
        bucket_conf->setString("file_path", path + '/' + failure_bucket);
1725
      } else {
1726
        // the bucket number is appended to the file path
1727
        unsigned int bucket_id = i + bucket_offset;
1728
 
1729
        ostringstream oss;
1730
        oss << path << '/' << bucket_subdir << setw(3) << setfill('0')
1731
            << bucket_id;
1732
        bucket_conf->setString("file_path", oss.str());
1733
      }
1734
    }
1735
 
1736
    buckets.push_back(newstore);
1737
    newstore->configure(bucket_conf);
1738
  }
1739
 
1740
  return;
1741
 
1742
handle_error:
1743
  setStatus(error_msg);
1744
  LOG_OPER("[%s] Bad config - %s", categoryHandled.c_str(),
1745
           error_msg.c_str());
1746
  numBuckets = 0;
1747
  buckets.clear();
1748
}
1749
 
1750
// Checks for a bucket definition for every bucket from 0 to numBuckets
1751
// and configures each bucket
1752
void BucketStore::createBuckets(pStoreConf configuration) {
1753
  string error_msg, tmp_string;
1754
  pStoreConf tmp;
1755
  unsigned long i;
1756
 
1757
  if (configuration->getString("bucket_subdir", tmp_string)) {
1758
    error_msg =
1759
      "cannot have bucket_subdir when defining multiple buckets";
1760
      goto handle_error;
1761
  }
1762
 
1763
  if (configuration->getString("bucket_offset", tmp_string)) {
1764
    error_msg =
1765
      "cannot have bucket_offset when defining multiple buckets";
1766
      goto handle_error;
1767
  }
1768
 
1769
  if (configuration->getString("failure_bucket", tmp_string)) {
1770
    error_msg =
1771
      "cannot have failure_bucket when defining multiple buckets";
1772
      goto handle_error;
1773
  }
1774
 
1775
  // Configure stores named 'bucket0, bucket1, bucket2, ... bucket{numBuckets}
1776
  for (i = 0; i <= numBuckets; i++) {
1777
    pStoreConf   bucket_conf;
1778
    string       type, bucket_name;
1779
    stringstream ss;
1780
 
1781
    ss << "bucket" << i;
1782
    bucket_name = ss.str();
1783
 
1784
    if (!configuration->getStore(bucket_name, bucket_conf)) {
1785
      error_msg = "could not find bucket definition for " +
1786
	bucket_name;
1787
      goto handle_error;
1788
    }
1789
 
1790
    if (!bucket_conf->getString("type", type)) {
1791
      error_msg =
1792
	"store contained in a bucket store must have a type";
1793
      goto handle_error;
1794
    }
1795
 
1796
    shared_ptr<Store> bucket =
1797
      createStore(type, categoryHandled, false, multiCategory);
1798
 
1799
    buckets.push_back(bucket);
1800
    bucket->configure(bucket_conf);
1801
  }
1802
 
1803
  // Check if an extra bucket is defined
1804
  if (configuration->getStore("bucket" + (numBuckets + 1), tmp)) {
1805
    error_msg = "bucket store has too many buckets defined";
1806
    goto handle_error;
1807
  }
1808
 
1809
  return;
1810
 
1811
handle_error:
1812
  setStatus(error_msg);
1813
  LOG_OPER("[%s] Bad config - %s", categoryHandled.c_str(),
1814
           error_msg.c_str());
1815
  numBuckets = 0;
1816
  buckets.clear();
1817
}
1818
 
1819
/**
1820
   * Buckets in a bucket store can be defined explicitly or implicitly:
1821
   *
1822
   * #Explicitly
1823
   * <store>
1824
   *   type=bucket
1825
   *   num_buckets=2
1826
   *   bucket_type=key_hash
1827
   *
1828
   *   <bucket0>
1829
   *     ...
1830
   *   </bucket0>
1831
   *
1832
   *   <bucket1>
1833
   *     ...
1834
   *   </bucket1>
1835
   *
1836
   *   <bucket2>
1837
   *     ...
1838
   *   </bucket2>
1839
   * </store>
1840
   *
1841
   * #Implicitly
1842
   * <store>
1843
   *   type=bucket
1844
   *   num_buckets=2
1845
   *   bucket_type=key_hash
1846
   *
1847
   *   <bucket>
1848
   *     ...
1849
   *   </bucket>
1850
   * </store>
1851
   */
1852
void BucketStore::configure(pStoreConf configuration) {
1853
 
1854
  string error_msg, bucketizer_str, remove_key_str;
1855
  unsigned long delim_long = 0;
1856
  pStoreConf bucket_conf;
1857
  //set this to true for bucket types that have a delimiter
1858
  bool need_delimiter = false;
1859
 
1860
  configuration->getString("bucket_type", bucketizer_str);
1861
 
1862
  // Figure out th bucket type from the bucketizer string
1863
  if (0 == bucketizer_str.compare("context_log")) {
1864
    bucketType = context_log;
1865
  } else if (0 == bucketizer_str.compare("random")) {
1866
      bucketType = random;
1867
  } else if (0 == bucketizer_str.compare("key_hash")) {
1868
    bucketType = key_hash;
1869
    need_delimiter = true;
1870
  } else if (0 == bucketizer_str.compare("key_modulo")) {
1871
    bucketType = key_modulo;
1872
    need_delimiter = true;
1873
  } else if (0 == bucketizer_str.compare("key_range")) {
1874
    bucketType = key_range;
1875
    need_delimiter = true;
1876
    configuration->getUnsigned("bucket_range", bucketRange);
1877
 
1878
    if (bucketRange == 0) {
1879
      LOG_OPER("[%s] config warning - bucket_range is 0",
1880
               categoryHandled.c_str());
1881
    }
1882
  }
1883
 
1884
  // This is either a key_hash or key_modulo, not context log, figure out the delimiter and store it
1885
  if (need_delimiter) {
1886
    configuration->getUnsigned("delimiter", delim_long);
1887
    if (delim_long > 255) {
1888
      LOG_OPER("[%s] config warning - delimiter is too large to fit in a char, using default", categoryHandled.c_str());
1889
      delimiter = DEFAULT_BUCKETSTORE_DELIMITER;
1890
    } else if (delim_long == 0) {
1891
      LOG_OPER("[%s] config warning - delimiter is zero, using default", categoryHandled.c_str());
1892
      delimiter = DEFAULT_BUCKETSTORE_DELIMITER;
1893
    } else {
1894
      delimiter = (char)delim_long;
1895
    }
1896
  }
1897
 
1898
  // Optionally remove the key and delimiter of each message before bucketizing
1899
  configuration->getString("remove_key", remove_key_str);
1900
  if (remove_key_str == "yes") {
1901
    removeKey = true;
1902
 
1903
    if (bucketType == context_log) {
1904
      error_msg =
1905
        "Bad config - bucketizer store of type context_log do not support remove_key";
1906
      goto handle_error;
1907
    }
1908
  }
1909
 
1910
  if (!configuration->getUnsigned("num_buckets", numBuckets)) {
1911
    error_msg = "Bad config - bucket store must have num_buckets";
1912
    goto handle_error;
1913
  }
1914
 
1915
  // Buckets can be defined explicitely or by specifying a single "bucket"
1916
  if (configuration->getStore("bucket", bucket_conf)) {
1917
    createBucketsFromBucket(configuration, bucket_conf);
1918
  } else {
1919
    createBuckets(configuration);
1920
  }
1921
 
1922
  return;
1923
 
1924
handle_error:
1925
  setStatus(error_msg);
1926
  LOG_OPER("[%s] %s", categoryHandled.c_str(), error_msg.c_str());
1927
  numBuckets = 0;
1928
  buckets.clear();
1929
}
1930
 
1931
bool BucketStore::open() {
1932
  // we have one extra bucket for messages we can't hash
1933
  if (numBuckets <= 0 || buckets.size() != numBuckets + 1) {
1934
    LOG_OPER("[%s] Can't open bucket store with <%d> of <%lu> buckets", categoryHandled.c_str(), (int)buckets.size(), numBuckets);
1935
    return false;
1936
  }
1937
 
1938
  for (std::vector<shared_ptr<Store> >::iterator iter = buckets.begin();
1939
       iter != buckets.end();
1940
       ++iter) {
1941
 
1942
    if (!(*iter)->open()) {
1943
      close();
1944
      opened = false;
1945
      return false;
1946
    }
1947
  }
1948
  opened = true;
1949
  return true;
1950
}
1951
 
1952
bool BucketStore::isOpen() {
1953
  return opened;
1954
}
1955
 
1956
void BucketStore::close() {
1957
  // don't check opened, because we can call this when some, but
1958
  // not all, contained stores are opened. Calling close on a contained
1959
  // store that's already closed shouldn't hurt anything.
1960
  for (std::vector<shared_ptr<Store> >::iterator iter = buckets.begin();
1961
       iter != buckets.end();
1962
       ++iter) {
1963
    (*iter)->close();
1964
  }
1965
  opened = false;
1966
}
1967
 
1968
void BucketStore::flush() {
1969
  for (std::vector<shared_ptr<Store> >::iterator iter = buckets.begin();
1970
       iter != buckets.end();
1971
       ++iter) {
1972
    (*iter)->flush();
1973
  }
1974
}
1975
 
1976
string BucketStore::getStatus() {
1977
 
1978
  string retval = Store::getStatus();
1979
 
1980
  std::vector<shared_ptr<Store> >::iterator iter = buckets.begin();
1981
  while (retval.empty() && iter != buckets.end()) {
1982
    retval = (*iter)->getStatus();
1983
    ++iter;
1984
  }
1985
  return retval;
1986
}
1987
 
1988
void BucketStore::periodicCheck() {
1989
  for (std::vector<shared_ptr<Store> >::iterator iter = buckets.begin();
1990
       iter != buckets.end();
1991
       ++iter) {
1992
    (*iter)->periodicCheck();
1993
  }
1994
}
1995
 
1996
shared_ptr<Store> BucketStore::copy(const std::string &category) {
1997
  BucketStore *store = new BucketStore(category, multiCategory);
1998
  shared_ptr<Store> copied = shared_ptr<Store>(store);
1999
 
2000
  store->numBuckets = numBuckets;
2001
  store->bucketType = bucketType;
2002
  store->delimiter = delimiter;
2003
 
2004
  for (std::vector<shared_ptr<Store> >::iterator iter = buckets.begin();
2005
       iter != buckets.end();
2006
       ++iter) {
2007
    store->buckets.push_back((*iter)->copy(category));
2008
  }
2009
 
2010
  return copied;
2011
}
2012
 
2013
bool BucketStore::handleMessages(boost::shared_ptr<logentry_vector_t> messages) {
2014
  bool success = true;
2015
 
2016
  boost::shared_ptr<logentry_vector_t> failed_messages(new logentry_vector_t);
2017
  vector<shared_ptr<logentry_vector_t> > bucketed_messages;
2018
  bucketed_messages.resize(numBuckets + 1);
2019
 
2020
  if (numBuckets == 0) {
2021
    LOG_OPER("[%s] Failed to write - no buckets configured",
2022
             categoryHandled.c_str());
2023
    setStatus("Failed write to bucket store");
2024
    return false;
2025
  }
2026
 
2027
  // batch messages by bucket
2028
  for (logentry_vector_t::iterator iter = messages->begin();
2029
       iter != messages->end();
2030
       ++iter) {
2031
    unsigned bucket = bucketize((*iter)->message);
2032
 
2033
    if (!bucketed_messages[bucket]) {
2034
      bucketed_messages[bucket] =
2035
        shared_ptr<logentry_vector_t> (new logentry_vector_t);
2036
    }
2037
 
2038
    bucketed_messages[bucket]->push_back(*iter);
2039
  }
2040
 
2041
  // handle all batches of messages
2042
  for (unsigned long i = 0; i <= numBuckets; i++) {
2043
    shared_ptr<logentry_vector_t> batch = bucketed_messages[i];
2044
 
2045
    if (batch) {
2046
 
2047
      if (removeKey) {
2048
        // Create new set of messages with keys removed
2049
        shared_ptr<logentry_vector_t> key_removed =
2050
          shared_ptr<logentry_vector_t> (new logentry_vector_t);
2051
 
2052
        for (logentry_vector_t::iterator iter = batch->begin();
2053
             iter != batch->end();
2054
             ++iter) {
2055
          logentry_ptr_t entry = logentry_ptr_t(new LogEntry);
2056
          entry->category = (*iter)->category;
2057
          entry->message = getMessageWithoutKey((*iter)->message);
2058
          key_removed->push_back(entry);
2059
        }
2060
        batch = key_removed;
2061
      }
2062
 
2063
      if (!buckets[i]->handleMessages(batch)) {
2064
        // keep track of messages that were not handled
2065
        failed_messages->insert(failed_messages->end(),
2066
                                bucketed_messages[i]->begin(),
2067
                                bucketed_messages[i]->end());
2068
        success = false;
2069
      }
2070
    }
2071
  }
2072
 
2073
  if (!success) {
2074
    // return failed logentrys in messages
2075
    messages->swap(*failed_messages);
2076
  }
2077
 
2078
  return success;
2079
}
2080
 
2081
unsigned long BucketStore::bucketize(const std::string& message) {
2082
 
2083
  string::size_type length = message.length();
2084
 
2085
  if (bucketType == context_log) {
2086
    // the key is in ascii after the third delimiter
2087
    char delim = 1;
2088
    string::size_type pos = 0;
2089
    for (int i = 0; i < 3; ++i) {
2090
      pos = message.find(delim, pos);
2091
      if (pos == string::npos || length <= pos + 1) {
2092
        return 0;
2093
      }
2094
      ++pos;
2095
    }
2096
    if (message[pos] == delim) {
2097
      return 0;
2098
    }
2099
 
2100
    uint32_t id = strtoul(message.substr(pos).c_str(), NULL, 10);
2101
    if (id == 0) {
2102
      return 0;
2103
    }
2104
 
2105
    if (numBuckets == 0) {
2106
      return 0;
2107
    } else {
2108
      return (integerhash::hash32(id) % numBuckets) + 1;
2109
    }
2110
  } else if (bucketType == random) {
2111
    // return any random bucket
2112
    return (rand() % numBuckets) + 1;
2113
  } else {
2114
    // just hash everything before the first user-defined delimiter
2115
    string::size_type pos = message.find(delimiter);
2116
    if (pos == string::npos) {
2117
      // if no delimiter found, write to bucket 0
2118
      return 0;
2119
    }
2120
 
2121
    string key = message.substr(0, pos).c_str();
2122
    if (key.empty()) {
2123
      // if no key found, write to bucket 0
2124
      return 0;
2125
    }
2126
 
2127
    if (numBuckets == 0) {
2128
      return 0;
2129
    } else {
2130
      switch (bucketType) {
2131
        case key_modulo:
2132
          // No hashing, just simple modulo
2133
          return (atol(key.c_str()) % numBuckets) + 1;
2134
          break;
2135
        case key_range:
2136
          if (bucketRange == 0) {
2137
            return 0;
2138
          } else {
2139
            // Calculate what bucket this key would fall into if we used
2140
            // bucket_range to compute the modulo
2141
           double key_mod = atol(key.c_str()) % bucketRange;
2142
           return (unsigned long) ((key_mod / bucketRange) * numBuckets) + 1;
2143
          }
2144
          break;
2145
        case key_hash:
2146
        default:
2147
          // Hashing by default.
2148
          return (strhash::hash32(key.c_str()) % numBuckets) + 1;
2149
          break;
2150
      }
2151
    }
2152
  }
2153
 
2154
  return 0;
2155
}
2156
 
2157
string BucketStore::getMessageWithoutKey(const std::string& message) {
2158
  string::size_type pos = message.find(delimiter);
2159
 
2160
  if (pos == string::npos) {
2161
    return message;
2162
  }
2163
 
2164
  return message.substr(pos+1);
2165
}
2166
 
2167
 
2168
NullStore::NullStore(const std::string& category, bool multi_category)
2169
  : Store(category, "null", multi_category)
2170
{}
2171
 
2172
NullStore::~NullStore() {
2173
}
2174
 
2175
boost::shared_ptr<Store> NullStore::copy(const std::string &category) {
2176
  NullStore *store = new NullStore(category, multiCategory);
2177
  shared_ptr<Store> copied = shared_ptr<Store>(store);
2178
  return copied;
2179
}
2180
 
2181
bool NullStore::open() {
2182
  return true;
2183
}
2184
 
2185
bool NullStore::isOpen() {
2186
  return true;
2187
}
2188
 
2189
void NullStore::configure(pStoreConf) {
2190
}
2191
 
2192
void NullStore::close() {
2193
}
2194
 
2195
bool NullStore::handleMessages(boost::shared_ptr<logentry_vector_t> messages) {
2196
  g_Handler->incrementCounter("ignored", messages->size());
2197
  return true;
2198
}
2199
 
2200
void NullStore::flush() {
2201
}
2202
 
2203
bool NullStore::readOldest(/*out*/ boost::shared_ptr<logentry_vector_t> messages,
2204
                       struct tm* now) {
2205
  return true;
2206
}
2207
 
2208
bool NullStore::replaceOldest(boost::shared_ptr<logentry_vector_t> messages,
2209
                              struct tm* now) {
2210
  return true;
2211
}
2212
 
2213
void NullStore::deleteOldest(struct tm* now) {
2214
}
2215
 
2216
bool NullStore::empty(struct tm* now) {
2217
  return true;
2218
}
2219
 
2220
MultiStore::MultiStore(const std::string& category, bool multi_category)
2221
  : Store(category, "multi", multi_category) {
2222
}
2223
 
2224
MultiStore::~MultiStore() {
2225
}
2226
 
2227
boost::shared_ptr<Store> MultiStore::copy(const std::string &category) {
2228
  MultiStore *store = new MultiStore(category, multiCategory);
2229
  store->report_success = this->report_success;
2230
  boost::shared_ptr<Store> tmp_copy;
2231
  for (std::vector<boost::shared_ptr<Store> >::iterator iter = stores.begin();
2232
       iter != stores.end();
2233
       ++iter) {
2234
    tmp_copy = (*iter)->copy(category);
2235
    store->stores.push_back(tmp_copy);
2236
  }
2237
 
2238
  return shared_ptr<Store>(store);
2239
}
2240
 
2241
bool MultiStore::open() {
2242
  bool all_result = true;
2243
  bool any_result = false;
2244
  bool cur_result;
2245
  for (std::vector<boost::shared_ptr<Store> >::iterator iter = stores.begin();
2246
       iter != stores.end();
2247
       ++iter) {
2248
    cur_result = (*iter)->open();
2249
    any_result |= cur_result;
2250
    all_result &= cur_result;
2251
  }
2252
  return (report_success == SUCCESS_ALL) ? all_result : any_result;
2253
}
2254
 
2255
bool MultiStore::isOpen() {
2256
  bool all_result = true;
2257
  bool any_result = false;
2258
  bool cur_result;
2259
  for (std::vector<boost::shared_ptr<Store> >::iterator iter = stores.begin();
2260
       iter != stores.end();
2261
       ++iter) {
2262
    cur_result = (*iter)->isOpen();
2263
    any_result |= cur_result;
2264
    all_result &= cur_result;
2265
  }
2266
  return (report_success == SUCCESS_ALL) ? all_result : any_result;
2267
}
2268
 
2269
void MultiStore::configure(pStoreConf configuration) {
2270
  /**
2271
   * in this store, we look for other numbered stores
2272
   * in the following fashion:
2273
   * <store>
2274
   *   type=multi
2275
   *   report_success=all|any
2276
   *   <store0>
2277
   *     ...
2278
   *   </store0>
2279
       ...
2280
   *   <storen>
2281
   *     ...
2282
   *   </storen>
2283
   * </store>
2284
   */
2285
  pStoreConf cur_conf;
2286
  string cur_type;
2287
  boost::shared_ptr<Store> cur_store;
2288
  string report_preference;
2289
 
2290
  // find reporting preference
2291
  if (configuration->getString("report_success", report_preference)) {
2292
    if (0 == report_preference.compare("all")) {
2293
      report_success = SUCCESS_ALL;
2294
      LOG_OPER("[%s] MULTI: Logging success only if all stores succeed.",
2295
               categoryHandled.c_str());
2296
    } else if (0 == report_preference.compare("any")) {
2297
      report_success = SUCCESS_ANY;
2298
      LOG_OPER("[%s] MULTI: Logging success if any store succeeds.",
2299
               categoryHandled.c_str());
2300
    } else {
2301
      LOG_OPER("[%s] MULTI: %s is an invalid value for report_success.",
2302
               categoryHandled.c_str(), report_preference.c_str());
2303
      setStatus("MULTI: Invalid report_success value.");
2304
      return;
2305
    }
2306
  } else {
2307
    report_success = SUCCESS_ALL;
2308
  }
2309
 
2310
  // find stores
2311
  for (int i=0; ;++i) {
2312
    stringstream ss;
2313
    ss << "store" << i;
2314
    if (!configuration->getStore(ss.str(), cur_conf)) {
2315
      // allow this to be 0 or 1 indexed
2316
      if (i == 0) {
2317
        continue;
2318
      }
2319
 
2320
      // no store for this id? we're finished.
2321
      break;
2322
    } else {
2323
      // find this store's type
2324
      if (!cur_conf->getString("type", cur_type)) {
2325
        LOG_OPER("[%s] MULTI: Store %d is missing type.", categoryHandled.c_str(), i);
2326
        setStatus("MULTI: Store is missing type.");
2327
        return;
2328
      } else {
2329
        // add it to the list
2330
        cur_store = createStore(cur_type, categoryHandled, false, multiCategory);
2331
        LOG_OPER("[%s] MULTI: Configured store of type %s successfully.",
2332
                 categoryHandled.c_str(), cur_type.c_str());
2333
        cur_store->configure(cur_conf);
2334
        stores.push_back(cur_store);
2335
      }
2336
    }
2337
  }
2338
 
2339
  if (stores.size() == 0) {
2340
    setStatus("MULTI: No stores found, invalid store.");
2341
    LOG_OPER("[%s] MULTI: No stores found, invalid store.", categoryHandled.c_str());
2342
  }
2343
}
2344
 
2345
void MultiStore::close() {
2346
  for (std::vector<boost::shared_ptr<Store> >::iterator iter = stores.begin();
2347
       iter != stores.end();
2348
       ++iter) {
2349
    (*iter)->close();
2350
  }
2351
}
2352
 
2353
bool MultiStore::handleMessages(boost::shared_ptr<logentry_vector_t> messages) {
2354
  bool all_result = true;
2355
  bool any_result = false;
2356
  bool cur_result;
2357
  for (std::vector<boost::shared_ptr<Store> >::iterator iter = stores.begin();
2358
       iter != stores.end();
2359
       ++iter) {
2360
    cur_result = (*iter)->handleMessages(messages);
2361
    any_result |= cur_result;
2362
    all_result &= cur_result;
2363
  }
2364
 
2365
  // We cannot accurately report the number of messages not handled as messages
2366
  // can be partially handled by a subset of stores.  So a multistore failure
2367
  // will over-record the number of lost messages.
2368
  return (report_success == SUCCESS_ALL) ? all_result : any_result;
2369
}
2370
 
2371
void MultiStore::periodicCheck() {
2372
  for (std::vector<boost::shared_ptr<Store> >::iterator iter = stores.begin();
2373
       iter != stores.end();
2374
       ++iter) {
2375
    (*iter)->periodicCheck();
2376
  }
2377
}
2378
 
2379
void MultiStore::flush() {
2380
  for (std::vector<boost::shared_ptr<Store> >::iterator iter = stores.begin();
2381
       iter != stores.end();
2382
       ++iter) {
2383
    (*iter)->flush();
2384
  }
2385
}
2386
 
2387
CategoryStore::CategoryStore(const std::string& category, bool multiCategory)
2388
  : Store(category, "category", multiCategory) {
2389
}
2390
 
2391
CategoryStore::CategoryStore(const std::string& category,
2392
                             const std::string& name, bool multiCategory)
2393
  : Store(category, name, multiCategory) {
2394
}
2395
 
2396
CategoryStore::~CategoryStore() {
2397
}
2398
 
2399
boost::shared_ptr<Store> CategoryStore::copy(const std::string &category) {
2400
  CategoryStore *store = new CategoryStore(category, multiCategory);
2401
 
2402
  store->modelStore = modelStore->copy(category);
2403
 
2404
  return shared_ptr<Store>(store);
2405
}
2406
 
2407
bool CategoryStore::open() {
2408
  bool result = true;
2409
 
2410
  for (map<string, shared_ptr<Store> >::iterator iter = stores.begin();
2411
      iter != stores.end();
2412
      ++iter) {
2413
    result &= iter->second->open();
2414
  }
2415
 
2416
  return result;
2417
}
2418
 
2419
bool CategoryStore::isOpen() {
2420
 
2421
  for (map<string, shared_ptr<Store> >::iterator iter = stores.begin();
2422
      iter != stores.end();
2423
      ++iter) {
2424
    if (!iter->second->isOpen()) {
2425
      return false;
2426
    }
2427
  }
2428
 
2429
  return true;
2430
}
2431
 
2432
void CategoryStore::configure(pStoreConf configuration) {
2433
  /**
2434
   *  Parse the store defined and use this store as a model to create a
2435
   *  new store for every new category we see later.
2436
   *  <store>
2437
   *    type=category
2438
   *    <model>
2439
   *      type=...
2440
   *      ...
2441
   *    </model>
2442
   *  </store>
2443
   */
2444
  pStoreConf cur_conf;
2445
 
2446
  if (!configuration->getStore("model", cur_conf)) {
2447
    setStatus("CATEGORYSTORE: NO stores found, invalid store.");
2448
    LOG_OPER("[%s] CATEGORYSTORE: No stores found, invalid store.",
2449
             categoryHandled.c_str());
2450
  } else {
2451
    string cur_type;
2452
 
2453
    // find this store's type
2454
    if (!cur_conf->getString("type", cur_type)) {
2455
      LOG_OPER("[%s] CATEGORYSTORE: Store is missing type.",
2456
               categoryHandled.c_str());
2457
      setStatus("CATEGORYSTORE: Store is missing type.");
2458
      return;
2459
    }
2460
 
2461
    configureCommon(cur_conf, cur_type);
2462
  }
2463
}
2464
 
2465
void CategoryStore::configureCommon(pStoreConf configuration,
2466
                                    const string type) {
2467
  // initialize model store
2468
  modelStore = createStore(type, categoryHandled, false, false);
2469
  LOG_OPER("[%s] %s: Configured store of type %s successfully.",
2470
           categoryHandled.c_str(), getType().c_str(), type.c_str());
2471
  modelStore->configure(configuration);
2472
}
2473
 
2474
void CategoryStore::close() {
2475
  for (map<string, shared_ptr<Store> >::iterator iter = stores.begin();
2476
      iter != stores.end();
2477
      ++iter) {
2478
    iter->second->close();
2479
  }
2480
}
2481
 
2482
bool CategoryStore::handleMessages(boost::shared_ptr<logentry_vector_t> messages) {
2483
  shared_ptr<logentry_vector_t> singleMessage(new logentry_vector_t);
2484
  shared_ptr<logentry_vector_t> failed_messages(new logentry_vector_t);
2485
  logentry_vector_t::iterator message_iter;
2486
 
2487
  for (message_iter = messages->begin();
2488
      message_iter != messages->end();
2489
      ++message_iter) {
2490
    map<string, shared_ptr<Store> >::iterator store_iter;
2491
    shared_ptr<Store> store;
2492
    string category = (*message_iter)->category;
2493
 
2494
    store_iter = stores.find(category);
2495
 
2496
    if (store_iter == stores.end()) {
2497
      // Create new store for this category
2498
      store = modelStore->copy(category);
2499
      store->open();
2500
      stores[category] = store;
2501
    } else {
2502
      store = store_iter->second;
2503
    }
2504
 
2505
    if (store == NULL || !store->isOpen()) {
2506
      LOG_OPER("[%s] Failed to open store for category <%s>",
2507
               categoryHandled.c_str(), category.c_str());
2508
      failed_messages->push_back(*message_iter);
2509
      continue;
2510
    }
2511
 
2512
    // send this message to the store that handles this category
2513
    singleMessage->clear();
2514
    singleMessage->push_back(*message_iter);
2515
 
2516
    if (!store->handleMessages(singleMessage)) {
2517
      LOG_OPER("[%s] Failed to handle message for category <%s>",
2518
               categoryHandled.c_str(), category.c_str());
2519
      failed_messages->push_back(*message_iter);
2520
      continue;
2521
    }
2522
  }
2523
 
2524
  if (!failed_messages->empty()) {
2525
    // Did not handle all messages, update message vector
2526
    messages->swap(*failed_messages);
2527
    return false;
2528
  } else {
2529
    return true;
2530
  }
2531
}
2532
 
2533
void CategoryStore::periodicCheck() {
2534
  for (map<string, shared_ptr<Store> >::iterator iter = stores.begin();
2535
      iter != stores.end();
2536
      ++iter) {
2537
    iter->second->periodicCheck();
2538
  }
2539
}
2540
 
2541
void CategoryStore::flush() {
2542
  for (map<string, shared_ptr<Store> >::iterator iter = stores.begin();
2543
      iter != stores.end();
2544
      ++iter) {
2545
    iter->second->flush();
2546
  }
2547
}
2548
 
2549
MultiFileStore::MultiFileStore(const std::string& category, bool multi_category)
2550
  : CategoryStore(category, "MultiFileStore", multi_category) {
2551
}
2552
 
2553
MultiFileStore::~MultiFileStore() {
2554
}
2555
 
2556
void MultiFileStore::configure(pStoreConf configuration) {
2557
  configureCommon(configuration, "file");
2558
}
2559
 
2560
ThriftMultiFileStore::ThriftMultiFileStore(const std::string& category,
2561
                                           bool multi_category)
2562
  : CategoryStore(category, "ThriftMultiFileStore", multi_category) {
2563
}
2564
 
2565
ThriftMultiFileStore::~ThriftMultiFileStore() {
2566
}
2567
 
2568
void ThriftMultiFileStore::configure(pStoreConf configuration) {
2569
  configureCommon(configuration, "thriftfile");
2570
}