Subversion Repositories SmartDukaan

Rev

Rev 301 | Details | Compare with Previous | Last modification | View Log | RSS feed

Rev Author Line No. Line
301 ashish 1
//  Copyright (c) 2007-2008 Facebook
2
//
3
//  Licensed under the Apache License, Version 2.0 (the "License");
4
//  you may not use this file except in compliance with the License.
5
//  You may obtain a copy of the License at
6
//
7
//      http://www.apache.org/licenses/LICENSE-2.0
8
//
9
//  Unless required by applicable law or agreed to in writing, software
10
//  distributed under the License is distributed on an "AS IS" BASIS,
11
//  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12
//  See the License for the specific language governing permissions and
13
//  limitations under the License.
14
//
15
// See accompanying file LICENSE or visit the Scribe site at:
16
// http://developers.facebook.com/scribe/
17
//
18
// @author Bobby Johnson
19
// @author James Wang
20
// @author Jason Sobel
21
// @author Avinash Lakshman
22
// @author Anthony Giardullo
23
 
24
#include "common.h"
25
#include "scribe_server.h"
26
 
27
using namespace apache::thrift;
28
using namespace apache::thrift::protocol;
29
using namespace apache::thrift::transport;
30
using namespace apache::thrift::server;
31
using namespace apache::thrift::concurrency;
32
 
33
using namespace facebook::fb303;
34
 
35
using namespace scribe::thrift;
36
using namespace std;
37
using boost::shared_ptr;
38
 
39
shared_ptr<scribeHandler> g_Handler;
40
 
41
#define DEFAULT_CHECK_PERIOD       5
42
#define DEFAULT_MAX_MSG_PER_SECOND 100000
43
#define DEFAULT_MAX_QUEUE_SIZE     5000000
44
#define DEFAULT_SERVER_THREADS     3
45
 
46
void print_usage(const char* program_name) {
47
  cout << "Usage: " << program_name << " [-p port] [-c config_file]" << endl;
48
}
49
 
50
int main(int argc, char **argv) {
51
 
52
  try {
53
    /* Increase number of fds */
54
    struct rlimit r_fd = {65535,65535};
55
    if (-1 == setrlimit(RLIMIT_NOFILE, &r_fd)) {
56
      LOG_OPER("setrlimit error (setting max fd size)");
57
    }
58
 
59
    int next_option;
60
    const char* const short_options = "hp:c:";
61
    const struct option long_options[] = {
62
      { "help",   0, NULL, 'h' },
63
      { "port",   0, NULL, 'p' },
64
      { "config", 0, NULL, 'c' },
65
      { NULL,     0, NULL, 'o' },
66
    };
67
 
68
    unsigned long int port = 0;  // this can also be specified in the conf file, which overrides the command line
69
    std::string config_file;
70
    while (0 < (next_option = getopt_long(argc, argv, short_options, long_options, NULL))) {
71
      switch (next_option) {
72
      default:
73
      case 'h':
74
        print_usage(argv[0]);
75
        exit(0);
76
      case 'c':
77
        config_file = optarg;
78
        break;
79
      case 'p':
80
        port = strtoul(optarg, NULL, 0);
81
        break;
82
      }
83
    }
84
 
85
    // assume a non-option arg is a config file name
86
    if (optind < argc && config_file.empty()) {
87
      config_file = argv[optind];
88
    }
89
 
90
    // seed random number generation with something reasonably unique
91
    srand(time(NULL) ^ getpid());
92
 
93
    g_Handler = shared_ptr<scribeHandler>(new scribeHandler(port, config_file));
94
    g_Handler->initialize();
95
 
96
    shared_ptr<TProcessor> processor(new scribeProcessor(g_Handler));
97
    /* This factory is for binary compatibility. */
98
    shared_ptr<TProtocolFactory>
99
      binaryProtocolFactory(new TBinaryProtocolFactory(0, 0, false, false));
100
    shared_ptr<ThreadManager> thread_manager;
101
 
102
    if (g_Handler->numThriftServerThreads > 1) {
103
      // create a ThreadManager to process incoming calls
104
      thread_manager = ThreadManager::
105
        newSimpleThreadManager(g_Handler->numThriftServerThreads);
106
 
107
      shared_ptr<PosixThreadFactory> thread_factory(new PosixThreadFactory());
108
      thread_manager->threadFactory(thread_factory);
109
      thread_manager->start();
110
    }
111
 
112
    TNonblockingServer server(processor, binaryProtocolFactory,
113
                              g_Handler->port, thread_manager);
114
 
115
    LOG_OPER("Starting scribe server on port %lu", g_Handler->port);
116
    fflush(stderr);
117
 
118
    server.serve();
119
 
120
  } catch(std::exception const& e) {
121
    LOG_OPER("Exception in main: %s", e.what());
122
  }
123
 
124
  LOG_OPER("scribe server exiting");
125
  return 0;
126
}
127
 
128
scribeHandler::scribeHandler(unsigned long int server_port, const std::string& config_file)
129
  : FacebookBase("Scribe"),
130
    port(server_port),
131
    numThriftServerThreads(DEFAULT_SERVER_THREADS),
132
    checkPeriod(DEFAULT_CHECK_PERIOD),
133
    pcategories(NULL),
134
    pcategory_prefixes(NULL),
135
    configFilename(config_file),
136
    status(STARTING),
137
    statusDetails("initial state"),
138
    numMsgLastSecond(0),
139
    maxMsgPerSecond(DEFAULT_MAX_MSG_PER_SECOND),
140
    maxQueueSize(DEFAULT_MAX_QUEUE_SIZE),
141
    newThreadPerCategory(true) {
142
  time(&lastMsgTime);
143
}
144
 
145
scribeHandler::~scribeHandler() {
146
  deleteCategoryMap(pcategories);
147
  if (pcategory_prefixes) {
148
    delete pcategory_prefixes;
149
    pcategory_prefixes = NULL;
150
  }
151
}
152
 
153
// Returns the handler status, but overwrites it with WARNING if it's
154
// ALIVE and at least one store has a nonempty status.
155
fb_status scribeHandler::getStatus() {
156
  RWGuard monitor(scribeHandlerLock);
157
  Guard status_monitor(statusLock);
158
 
159
  fb_status return_status(status);
160
  if (status == ALIVE) {
161
    for (category_map_t::iterator cat_iter = pcategories->begin();
162
        cat_iter != pcategories->end();
163
        ++cat_iter) {
164
      for (store_list_t::iterator store_iter = cat_iter->second->begin();
165
           store_iter != cat_iter->second->end();
166
           ++store_iter)
167
      {
168
        if (!(*store_iter)->getStatus().empty())
169
        {
170
          return_status = WARNING;
171
          return return_status;
172
        }
173
      } // for each store
174
    } // for each category
175
  } // if we don't have an interesting top level status
176
  return return_status;
177
}
178
 
179
void scribeHandler::setStatus(fb_status new_status) {
180
  LOG_OPER("STATUS: %s", statusAsString(new_status));
181
  Guard status_monitor(statusLock);
182
  status = new_status;
183
}
184
 
185
// Returns the handler status details if non-empty,
186
// otherwise the first non-empty store status found
187
void scribeHandler::getStatusDetails(std::string& _return) {
188
  RWGuard monitor(scribeHandlerLock);
189
  Guard status_monitor(statusLock);
190
 
191
  _return = statusDetails;
192
  if (_return.empty()) {
193
    if (pcategories) {
194
      for (category_map_t::iterator cat_iter = pcategories->begin();
195
          cat_iter != pcategories->end();
196
          ++cat_iter) {
197
        for (store_list_t::iterator store_iter = cat_iter->second->begin();
198
            store_iter != cat_iter->second->end();
199
            ++store_iter) {
200
 
201
          if (!(_return = (*store_iter)->getStatus()).empty()) {
202
            return;
203
          }
204
        } // for each store
205
      } // for each category
206
    }
207
  } // if we don't have an interesting top level status
208
  return;
209
}
210
 
211
void scribeHandler::setStatusDetails(const string& new_status_details) {
212
  LOG_OPER("STATUS: %s", new_status_details.c_str());
213
  Guard status_monitor(statusLock);
214
  statusDetails = new_status_details;
215
}
216
 
217
const char* scribeHandler::statusAsString(fb_status status) {
218
  switch (status) {
219
  case DEAD:
220
    return "DEAD";
221
  case STARTING:
222
    return "STARTING";
223
  case ALIVE:
224
    return "ALIVE";
225
  case STOPPING:
226
    return "STOPPING";
227
  case STOPPED:
228
    return "STOPPED";
229
  case WARNING:
230
    return "WARNING";
231
  default:
232
    return "unknown status code";
233
  }
234
}
235
 
236
 
237
// Should be called while holding a writeLock on scribeHandlerLock
238
bool scribeHandler::createCategoryFromModel(
239
  const string &category, const boost::shared_ptr<StoreQueue> &model) {
240
 
241
  if ((pcategories == NULL) ||
242
      (pcategories->find(category) != pcategories->end())) {
243
    return false;
244
  }
245
 
246
  LOG_OPER("[%s] Creating new category from model %s", category.c_str(),
247
           model->getCategoryHandled().c_str());
248
 
249
  // Make sure the category name is sane.
250
  try {
251
    string clean_path = boost::filesystem::path(category).string();
252
 
253
    if (clean_path.compare(category) != 0) {
254
      LOG_OPER("Category not a valid boost filename");
255
      return false;
256
    }
257
 
258
  } catch(std::exception const& e) {
259
    LOG_OPER("Category not a valid boost filename.  Boost exception:%s", e.what());
260
    return false;
261
  }
262
 
263
  shared_ptr<StoreQueue> pstore;
264
  if (newThreadPerCategory) {
265
    // Create a new thread/StoreQueue for this category
266
    pstore = shared_ptr<StoreQueue>(new StoreQueue(model, category));
267
 
268
    // queue a command to the store to open it
269
    pstore->open();
270
  } else {
271
    // Use existing StoreQueue
272
    pstore = model;
273
  }
274
 
275
  shared_ptr<store_list_t> pstores =
276
    shared_ptr<store_list_t>(new store_list_t);
277
 
278
  (*pcategories)[category] = pstores;
279
  pstores->push_back(pstore);
280
 
281
  return true;
282
}
283
 
284
 
285
// Check if we need to deny this request due to throttling
286
bool scribeHandler::throttleRequest(const vector<LogEntry>&  messages) {
287
  // Check if we need to rate limit
288
  if (throttleDeny(messages.size())) {
289
    incrementCounter("denied for rate");
290
    return true;
291
  }
292
 
293
  if (!pcategories || !pcategory_prefixes) {
294
    // don't bother to spam anything for this, our status should already
295
    // be showing up as WARNING in the monitoring tools.
296
    incrementCounter("invalid requests");
297
    return true;
298
  }
299
 
300
  // Throttle based on store queues getting too long.
301
  // Note that there's one decision for all categories, because the whole array passed to us
302
  // must either succeed or fail together. Checking before we've queued anything also has
303
  // the nice property that any size array will succeed if we're unloaded before attempting
304
  // it, so we won't hit a case where there's a client request that will never succeed.
305
  // Also note that we always check all categories, not just the ones in this request.
306
  // This is a simplification based on the assumption that most Log() calls contain most
307
  // categories.
308
  unsigned long max_count = 0;
309
  for (category_map_t::iterator cat_iter = pcategories->begin();
310
       cat_iter != pcategories->end();
311
       ++cat_iter) {
312
    shared_ptr<store_list_t> pstores = cat_iter->second;
313
    if (!pstores) {
314
      throw std::logic_error("throttle check: iterator in category map holds null pointer");
315
    }
316
    for (store_list_t::iterator store_iter = pstores->begin();
317
         store_iter != pstores->end();
318
         ++store_iter) {
319
      if (*store_iter == NULL) {
320
        throw std::logic_error("throttle check: iterator in store map holds null pointer");
321
      } else {
322
        unsigned long size = (*store_iter)->getSize();
323
        if (size > max_count) {
324
          max_count = size;
325
        }
326
      }
327
    }
328
  }
329
 
330
  if (max_count > maxQueueSize) {
331
    incrementCounter("denied for queue size");
332
    return true;
333
  }
334
 
335
  return false;
336
}
337
 
338
// Should be called while holding a writeLock on scribeHandlerLock
339
shared_ptr<store_list_t> scribeHandler::createNewCategory(
340
  const string& category) {
341
 
342
  shared_ptr<store_list_t> store_list;
343
 
344
  // First, check the list of category prefixes for a model
345
  category_prefix_map_t::iterator cat_prefix_iter = pcategory_prefixes->begin();
346
  while (cat_prefix_iter != pcategory_prefixes->end()) {
347
    string::size_type len = cat_prefix_iter->first.size();
348
    if (cat_prefix_iter->first.compare(0, len-1, category, 0, len-1) == 0) {
349
      // Found a matching prefix model
350
 
351
      createCategoryFromModel(category, cat_prefix_iter->second);
352
      category_map_t::iterator cat_iter = pcategories->find(category);
353
 
354
      if (cat_iter != pcategories->end()) {
355
        store_list = cat_iter->second;
356
      } else {
357
        LOG_OPER("failed to create new prefix store for category <%s>",
358
                 category.c_str());
359
      }
360
 
361
      break;
362
    }
363
    cat_prefix_iter++;
364
  }
365
 
366
  // Then try creating a store if we have a default store defined
367
  if (store_list == NULL) {
368
    if (defaultStore != NULL) {
369
 
370
      createCategoryFromModel(category, defaultStore);
371
      category_map_t::iterator cat_iter = pcategories->find(category);
372
 
373
      if (cat_iter != pcategories->end()) {
374
        store_list = cat_iter->second;
375
      } else {
376
        LOG_OPER("failed to create new default store for category <%s>",
377
                 category.c_str());
378
      }
379
    }
380
  }
381
 
382
  return store_list;
383
}
384
 
385
// Add this message to every store in list
386
void scribeHandler::addMessage(
387
  const LogEntry& entry,
388
  const shared_ptr<store_list_t>& store_list) {
389
 
390
  int numstores = 0;
391
 
392
  // Add message to store_list
393
  for (store_list_t::iterator store_iter = store_list->begin();
394
       store_iter != store_list->end();
395
       ++store_iter) {
396
    ++numstores;
397
    boost::shared_ptr<LogEntry> ptr(new LogEntry);
398
    ptr->category = entry.category;
399
    ptr->message = entry.message;
400
 
401
    (*store_iter)->addMessage(ptr);
402
  }
403
 
404
  if (numstores) {
405
    incrementCounter("received good");
406
  } else {
407
    incrementCounter("received bad");
408
  }
409
}
410
 
411
 
412
ResultCode scribeHandler::Log(const vector<LogEntry>&  messages) {
413
  ResultCode result;
414
 
415
  scribeHandlerLock.acquireRead();
416
 
417
  if (throttleRequest(messages)) {
418
    result = TRY_LATER;
419
    goto end;
420
  }
421
 
422
  for (vector<LogEntry>::const_iterator msg_iter = messages.begin();
423
       msg_iter != messages.end();
424
       ++msg_iter) {
425
 
426
    // disallow blank category from the start
427
    if ((*msg_iter).category.empty()) {
428
      incrementCounter("received blank category");
429
      continue;
430
    }
431
 
432
    shared_ptr<store_list_t> store_list;
433
    string category = (*msg_iter).category;
434
 
435
    // First look for an exact match of the category
436
    if (pcategories) {
437
      category_map_t::iterator cat_iter = pcategories->find(category);
438
      if (cat_iter != pcategories->end()) {
439
        store_list = cat_iter->second;
440
      }
441
    }
442
 
443
    // Try creating a new store for this category if we didn't find one
444
    if (store_list == NULL) {
445
      // Need write lock to create a new category
446
      scribeHandlerLock.release();
447
      scribeHandlerLock.acquireWrite();
448
 
449
      store_list = createNewCategory(category);
450
 
451
      scribeHandlerLock.release();
452
      scribeHandlerLock.acquireRead();
453
    }
454
 
455
    if (store_list == NULL) {
456
       LOG_OPER("log entry has invalid category <%s>",
457
                (*msg_iter).category.c_str());
458
      incrementCounter("received bad");
459
      continue;
460
    }
461
 
462
    // Log this message
463
    addMessage(*msg_iter, store_list);
464
  }
465
 
466
  result = OK;
467
 
468
 end:
469
  scribeHandlerLock.release();
470
  return result;
471
}
472
 
473
// Returns true if overloaded.
474
// Allows a fixed number of messages per second.
475
bool scribeHandler::throttleDeny(int num_messages) {
476
  time_t now;
477
  time(&now);
478
  if (now != lastMsgTime) {
479
    lastMsgTime = now;
480
    numMsgLastSecond = 0;
481
  }
482
 
483
  // If we get a single huge packet it's not cool, but we'd better
484
  // accept it or we'll keep having to read it and deny it indefinitely
485
  if (num_messages > (int)maxMsgPerSecond/2) {
486
    LOG_OPER("throttle allowing rediculously large packet with <%d> messages", num_messages);
487
    return false;
488
  }
489
 
490
  if (numMsgLastSecond + num_messages > maxMsgPerSecond) {
491
    LOG_OPER("throttle denying request with <%d> messages. It would exceed max of <%lu> messages this second",
492
           num_messages, maxMsgPerSecond);
493
    return true;
494
  } else {
495
    numMsgLastSecond += num_messages;
496
    return false;
497
  }
498
}
499
 
500
void scribeHandler::stopStores() {
501
  setStatus(STOPPING);
502
 
503
  // Thrift doesn't currently support stopping the server from the handler,
504
  // so this could leave clients in weird states.
505
  deleteCategoryMap(pcategories);
506
  pcategories = NULL;
507
  if (pcategory_prefixes) {
508
    delete pcategory_prefixes;
509
    pcategory_prefixes = NULL;
510
  }
511
}
512
 
513
void scribeHandler::shutdown() {
514
  RWGuard monitor(scribeHandlerLock, true);
515
 
516
  stopStores();
517
  exit(0);
518
}
519
 
520
void scribeHandler::reinitialize() {
521
  RWGuard monitor(scribeHandlerLock, true);
522
 
523
  // reinitialize() will re-read the config file and re-configure the stores.
524
  // This is done without shutting down the Thrift server, so this will not
525
  // reconfigure any server settings such as port number.
526
  LOG_OPER("reinitializing");
527
  stopStores();
528
  initialize();
529
}
530
 
531
void scribeHandler::initialize() {
532
 
533
  // This clears out the error state, grep for setStatus below for details
534
  setStatus(STARTING);
535
  setStatusDetails("configuring");
536
 
537
  bool perfect_config = true;
538
  bool enough_config_to_run = true;
539
  int numstores = 0;
540
 
541
  pnew_categories = new category_map_t;
542
  pnew_category_prefixes = new category_prefix_map_t;
543
  tmpDefault.reset();
544
 
545
  try {
546
    // Get the config data and parse it.
547
    // If a file has been explicitly specified we'll take the conf from there,
548
    // which is very handy for testing and one-off applications.
549
    // Otherwise we'll try to get it from the service management console and
550
    // fall back to a default file location. This is for production.
551
    StoreConf config;
552
    string config_file;
553
 
554
    if (configFilename.empty()) {
555
      config_file = DEFAULT_CONF_FILE_LOCATION;
556
    } else {
557
      config_file = configFilename;
558
    }
559
    config.parseConfig(config_file);
560
 
561
    // load the global config
562
    config.getUnsigned("max_msg_per_second", maxMsgPerSecond);
563
    config.getUnsigned("max_queue_size", maxQueueSize);
564
    config.getUnsigned("check_interval", checkPeriod);
565
 
566
    // If new_thread_per_category, then we will create a new thread/StoreQueue
567
    // for every unique message category seen.  Otherwise, we will just create
568
    // one thread for each top-level store defined in the config file.
569
    string temp;
570
    config.getString("new_thread_per_category", temp);
571
    if (0 == temp.compare("no")) {
572
      newThreadPerCategory = false;
573
    } else {
574
      newThreadPerCategory = true;
575
    }
576
 
577
    unsigned long int old_port = port;
578
    config.getUnsigned("port", port);
579
    if (old_port != 0 && port != old_port) {
580
      LOG_OPER("port %lu from conf file overriding old port %lu", port, old_port);
581
    }
582
    if (port <= 0) {
583
      throw runtime_error("No port number configured");
584
    }
585
 
586
    // check if config sets the size to use for the ThreadManager
587
    unsigned long int num_threads;
588
    if (config.getUnsigned("num_thrift_server_threads", num_threads)) {
589
      numThriftServerThreads = (size_t) num_threads;
590
 
591
      if (numThriftServerThreads <= 0) {
592
        LOG_OPER("invalid value for num_thrift_server_threads: %lu",
593
                 num_threads);
594
        throw runtime_error("invalid value for num_thrift_server_threads");
595
      }
596
    }
597
 
598
    // Build a new map of stores, and move stores from the old map as
599
    // we find them in the config file. Any stores left in the old map
600
    // at the end will be deleted.
601
    std::vector<pStoreConf> store_confs;
602
    config.getAllStores(store_confs);
603
    for (std::vector<pStoreConf>::iterator iter = store_confs.begin();
604
         iter != store_confs.end();
605
         ++iter) {
606
        pStoreConf store_conf = (*iter);
607
 
608
        bool success = configureStore(store_conf, &numstores);
609
 
610
        if (!success) {
611
          perfect_config = false;
612
        }
613
    }
614
  } catch(std::exception const& e) {
615
    string errormsg("Bad config - exception: ");
616
    errormsg += e.what();
617
    setStatusDetails(errormsg);
618
    perfect_config = false;
619
    enough_config_to_run = false;
620
  }
621
 
622
  if (numstores) {
623
    LOG_OPER("configured <%d> stores", numstores);
624
  } else {
625
    setStatusDetails("No stores configured successfully");
626
    perfect_config = false;
627
    enough_config_to_run = false;
628
  }
629
 
630
  // clean up existing stores
631
  deleteCategoryMap(pcategories);
632
  pcategories = NULL;
633
  if (pcategory_prefixes) {
634
    delete pcategory_prefixes;
635
    pcategory_prefixes = NULL;
636
  }
637
  defaultStore.reset();
638
 
639
  if (enough_config_to_run) {
640
    pcategories = pnew_categories;
641
    pcategory_prefixes = pnew_category_prefixes;
642
    defaultStore = tmpDefault;
643
  } else {
644
    // If the new configuration failed we'll run with
645
    // nothing configured and status set to WARNING
646
    deleteCategoryMap(pnew_categories);
647
    if (pnew_category_prefixes) {
648
      delete pnew_category_prefixes;
649
    }
650
  }
651
 
652
  pnew_categories = NULL;
653
  pnew_category_prefixes = NULL;
654
  tmpDefault.reset();
655
 
656
  if (!perfect_config || !enough_config_to_run) { // perfect should be a subset of enough, but just in case
657
    setStatus(WARNING); // status details should have been set above
658
  } else {
659
    setStatusDetails("");
660
    setStatus(ALIVE);
661
  }
662
}
663
 
664
 
665
// Configures the store specified by the store configuration. Returns false if failed.
666
bool scribeHandler::configureStore(pStoreConf store_conf, int *numstores) {
667
  string category;
668
  shared_ptr<StoreQueue> pstore;
669
  vector<string> category_list;
670
  shared_ptr<StoreQueue> model;
671
  bool single_category = true;
672
 
673
 
674
  // Check if a single category is specified
675
  if (store_conf->getString("category", category)) {
676
    category_list.push_back(category);
677
  }
678
 
679
  // Check if multiple categories are specified
680
  string categories;
681
  if (store_conf->getString("categories", categories)) {
682
    // We want to set up to configure multiple categories, even if there is
683
    // only one category specified here so that configuration is consistent
684
    // for the 'categories' keyword.
685
    single_category = false;
686
 
687
    // Parse category names, separated by whitespace
688
    stringstream ss(categories);
689
 
690
    while (ss >> category) {
691
      category_list.push_back(category);
692
    }
693
  }
694
 
695
  if (category_list.size() == 0) {
696
    setStatusDetails("Bad config - store with no category");
697
    return false;
698
  }
699
  else if (single_category) {
700
    // configure single store
701
    shared_ptr<StoreQueue> result =
702
      configureStoreCategory(store_conf, category_list[0], model);
703
 
704
    if (result == NULL) {
705
      return false;
706
    }
707
 
708
    (*numstores)++;
709
  } else {
710
    // configure multiple stores
711
    string type;
712
 
713
    if (!store_conf->getString("type", type) ||
714
        type.empty()) {
715
      string errormsg("Bad config - no type for store with category: ");
716
      errormsg += categories;
717
      setStatusDetails(errormsg);
718
      return false;
719
    }
720
 
721
    // create model so that we can create stores as copies of this model
722
    model = configureStoreCategory(store_conf, categories, model, true);
723
 
724
    if (model == NULL) {
725
      string errormsg("Bad config - could not create store for category: ");
726
      errormsg += categories;
727
      setStatusDetails(errormsg);
728
      return false;
729
    }
730
 
731
    // create a store for each category
732
    vector<string>::iterator iter;
733
    for (iter = category_list.begin(); iter < category_list.end(); iter++) {
734
       shared_ptr<StoreQueue> result =
735
         configureStoreCategory(store_conf, *iter, model);
736
 
737
      if (!result) {
738
        return false;
739
      }
740
 
741
      (*numstores)++;
742
    }
743
  }
744
 
745
  return true;
746
}
747
 
748
 
749
// Configures the store specified by the store configuration and category.
750
shared_ptr<StoreQueue> scribeHandler::configureStoreCategory(
751
  pStoreConf store_conf,                       //configuration for store
752
  const string &category,                      //category name
753
  const boost::shared_ptr<StoreQueue> &model,  //model to use (optional)
754
  bool category_list) {                        //is a list of stores?
755
 
756
  bool is_default = false;
757
  bool already_created = false;
758
 
759
  if (category.empty()) {
760
    setStatusDetails("Bad config - store with blank category");
761
    return shared_ptr<StoreQueue>();
762
  }
763
 
764
  LOG_OPER("CATEGORY : %s", category.c_str());
765
  if (0 == category.compare("default")) {
766
    if (tmpDefault != NULL) {
767
      setStatusDetails("Bad config - multiple default stores specified");
768
      return shared_ptr<StoreQueue>();
769
    }
770
    is_default = true;
771
  }
772
 
773
  bool is_prefix_category = (!category.empty() &&
774
                             category[category.size() - 1] == '*' &&
775
                             !category_list);
776
 
777
  std::string type;
778
  if (!store_conf->getString("type", type) ||
779
      type.empty()) {
780
    string errormsg("Bad config - no type for store with category: ");
781
    errormsg += category;
782
    setStatusDetails(errormsg);
783
    return shared_ptr<StoreQueue>();
784
  }
785
 
786
  // look for the store in the current list
787
  shared_ptr<StoreQueue> pstore;
788
  if (!is_prefix_category && pcategories) {
789
    category_map_t::iterator category_iter = pcategories->find(category);
790
    if (category_iter != pcategories->end()) {
791
      shared_ptr<store_list_t> pstores = category_iter->second;
792
 
793
      for ( store_list_t::iterator it = pstores->begin(); it != pstores->end(); ++it ) {
794
        if ( (*it)->getBaseType() == type &&
795
             pstores->size() <=  1) { // no good way to match them up if there's more than one
796
          pstore = (*it);
797
          pstores->erase(it);
798
        }
799
      }
800
    }
801
  }
802
 
803
  try {
804
    // create a new store if it doesn't already exist
805
    if (!pstore) {
806
      if (model != NULL) {
807
        // Create a copy of the model if we want a new thread per category
808
        if (newThreadPerCategory && !is_default && !is_prefix_category) {
809
          pstore = shared_ptr<StoreQueue>(new StoreQueue(model, category));
810
        } else {
811
          pstore = model;
812
          already_created = true;
813
        }
814
      } else {
815
        string store_name;
816
        bool is_model, multi_category, categories;
817
 
818
        /* remove any *'s from category name */
819
        if (is_prefix_category)
820
          store_name = category.substr(0, category.size() - 1);
821
        else
822
          store_name = category;
823
 
824
        // Does this store define multiple categories
825
        categories = (is_default || is_prefix_category || category_list);
826
 
827
        // Determine if this store will actually handle multiple categories
828
        multi_category = !newThreadPerCategory && categories;
829
 
830
        // Determine if this store is just a model for later stores
831
        is_model = newThreadPerCategory && categories;
832
 
833
        pstore =
834
          shared_ptr<StoreQueue>(new StoreQueue(type, store_name, checkPeriod,
835
                                                is_model, multi_category));
836
      }
837
    }
838
  } catch (...) {
839
    pstore.reset();
840
  }
841
 
842
  if (!pstore) {
843
    string errormsg("Bad config - can't create a store of type: ");
844
    errormsg += type;
845
    setStatusDetails(errormsg);
846
    return shared_ptr<StoreQueue>();
847
  }
848
 
849
  // open store. and configure it if not copied from a model
850
  if (model == NULL) {
851
    pstore->configureAndOpen(store_conf);
852
  } else if (!already_created) {
853
    pstore->open();
854
  }
855
 
856
  if (is_default) {
857
    LOG_OPER("Creating default store");
858
    tmpDefault = pstore;
859
  }
860
  else if (is_prefix_category) {
861
    category_prefix_map_t::iterator category_iter =
862
      pnew_category_prefixes->find(category);
863
 
864
    if (category_iter == pnew_category_prefixes->end()) {
865
      (*pnew_category_prefixes)[category] = pstore;
866
    } else {
867
      string errormsg =
868
        "Bad config - multiple prefix stores specified for category: ";
869
 
870
      errormsg += category;
871
      setStatusDetails(errormsg);
872
      return shared_ptr<StoreQueue>();
873
    }
874
  }
875
 
876
  // push the new store onto the new map if it's not just a model
877
  if (!pstore->isModelStore() && !category_list) {
878
    shared_ptr<store_list_t> pstores;
879
    category_map_t::iterator category_iter = pnew_categories->find(category);
880
    if (category_iter != pnew_categories->end()) {
881
      pstores = category_iter->second;
882
    } else {
883
      pstores = shared_ptr<store_list_t>(new store_list_t);
884
      (*pnew_categories)[category] = pstores;
885
    }
886
        pstores->push_back(pstore);
887
  }
888
 
889
  return pstore;
890
}
891
 
892
 
893
// delete pcats and everything it contains
894
void scribeHandler::deleteCategoryMap(category_map_t *pcats) {
895
  if (!pcats) {
896
    return;
897
  }
898
  for (category_map_t::iterator cat_iter = pcats->begin();
899
       cat_iter != pcats->end();
900
       ++cat_iter) {
901
    shared_ptr<store_list_t> pstores = cat_iter->second;
902
    if (!pstores) {
903
      throw std::logic_error("deleteCategoryMap: iterator in category map holds null pointer");
904
    }
905
    for (store_list_t::iterator store_iter = pstores->begin();
906
         store_iter != pstores->end();
907
         ++store_iter) {
908
      if (!*store_iter) {
909
        throw std::logic_error("deleteCategoryMap: iterator in store map holds null pointer");
910
      }
911
 
912
      (*store_iter)->stop();
913
    } // for each store
914
    pstores->clear();
915
  } // for each category
916
  pcats->clear();
917
  delete pcats;
918
}