Subversion Repositories SmartDukaan

Rev

Details | Last modification | View Log | RSS feed

Rev Author Line No. Line
301 ashish 1
//  Copyright (c) 2007-2008 Facebook
2
//
3
//  Licensed under the Apache License, Version 2.0 (the "License");
4
//  you may not use this file except in compliance with the License.
5
//  You may obtain a copy of the License at
6
//
7
//      http://www.apache.org/licenses/LICENSE-2.0
8
//
9
//  Unless required by applicable law or agreed to in writing, software
10
//  distributed under the License is distributed on an "AS IS" BASIS,
11
//  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12
//  See the License for the specific language governing permissions and
13
//  limitations under the License.
14
//
15
// See accompanying file LICENSE or visit the Scribe site at:
16
// http://developers.facebook.com/scribe/
17
//
18
// @author Bobby Johnson
19
// @author James Wang
20
// @author Jason Sobel
21
// @author Anthony Giardullo
22
 
23
#ifndef SCRIBE_STORE_QUEUE_H
24
#define SCRIBE_STORE_QUEUE_H
25
 
26
#include <string>
27
#include <queue>
28
#include <vector>
29
#include <pthread.h>
30
 
31
#include "src/gen-cpp/scribe.h"
32
#include "store.h"
33
 
34
/*
35
 * This class implements a queue and a thread for dispatching
36
 * events to a store. It creates a store object of the requested
37
 * type, which can in turn create and manage other store objects.
38
 */
39
class StoreQueue {
40
 public:
41
  StoreQueue(const std::string& type, const std::string& category,
42
             unsigned check_period, bool is_model=false, bool multi_category=false);
43
  StoreQueue(const boost::shared_ptr<StoreQueue> example,
44
             const std::string &category);
45
  virtual ~StoreQueue();
46
 
47
  void addMessage(logentry_ptr_t entry);
48
  void configureAndOpen(pStoreConf configuration); // closes first if already open
49
  void open();                                     // closes first if already open
50
  void stop();
51
  boost::shared_ptr<Store> copyStore(const std::string &category);
52
  std::string getStatus(); // An empty string means OK, anything else is an error
53
  std::string getBaseType();
54
  std::string getCategoryHandled();
55
  bool isModelStore() { return isModel;}
56
 
57
  // this needs to be public for the thread creation to get to it,
58
  // but no one else should ever call it.
59
  void threadMember();
60
 
61
  // WARNING: don't expect this to be exact, because it could change after you check.
62
  //          This is only for hueristics to decide when we're overloaded.
63
  unsigned long getSize();
64
 
65
 private:
66
  void storeInitCommon();
67
  void configureInline(pStoreConf configuration);
68
  void openInline();
69
  void processFailedMessages(boost::shared_ptr<logentry_vector_t> messages);
70
 
71
  // implementation of queues and thread
72
  enum store_command_t {
73
    CMD_CONFIGURE,
74
    CMD_OPEN,
75
    CMD_STOP
76
  };
77
 
78
  class StoreCommand {
79
  public:
80
    store_command_t command;
81
    pStoreConf configuration;
82
 
83
    StoreCommand(store_command_t cmd, pStoreConf config = pStoreConf())
84
      : command(cmd), configuration(config) {};
85
  };
86
 
87
  typedef std::queue<StoreCommand> cmd_queue_t;
88
 
89
  // messages and commands are in different queues to allow bulk
90
  // handling of messages. This means that order of commands with
91
  // respect to messages is not preserved.
92
  cmd_queue_t cmdQueue;
93
  boost::shared_ptr<logentry_vector_t> msgQueue;
94
  boost::shared_ptr<logentry_vector_t> failedMessages;
95
  unsigned long msgQueueSize;   // in bytes
96
  pthread_t storeThread;
97
 
98
  // Mutexes
99
  pthread_mutex_t cmdMutex;     // Must be held to read/modify cmdQueue
100
  pthread_mutex_t msgMutex;     // Must be held to read/modify msgQueue
101
  pthread_mutex_t hasWorkMutex; // Must be held to read/modify hasWork
102
  // If acquiring multiple mutexes, always acquire in this order:
103
  // {cmdMutex, msgMutex, hasWorkMutex}
104
 
105
  bool hasWork;  // whether there are messages or commands queued
106
  pthread_cond_t hasWorkCond; // cond variable to wait on for hasWork
107
 
108
  bool stopping;
109
  bool isModel;
110
  bool multiCategory; // Whether multiple categories are handled
111
 
112
  // configuration
113
  std::string   categoryHandled;  // what category this store is handling
114
  time_t        checkPeriod;      // how often to call periodicCheck in seconds
115
  unsigned long targetWriteSize;  // in bytes
116
  time_t        maxWriteInterval; // in seconds
117
  bool          mustSucceed;      // Always retry even if secondary fails
118
 
119
  // Store that will handle messages. This can contain other stores.
120
  boost::shared_ptr<Store> store;
121
};
122
 
123
#endif //!defined SCRIBE_STORE_QUEUE_H