| 301 |
ashish |
1 |
// Copyright (c) 2007-2008 Facebook
|
|
|
2 |
//
|
|
|
3 |
// Licensed under the Apache License, Version 2.0 (the "License");
|
|
|
4 |
// you may not use this file except in compliance with the License.
|
|
|
5 |
// You may obtain a copy of the License at
|
|
|
6 |
//
|
|
|
7 |
// http://www.apache.org/licenses/LICENSE-2.0
|
|
|
8 |
//
|
|
|
9 |
// Unless required by applicable law or agreed to in writing, software
|
|
|
10 |
// distributed under the License is distributed on an "AS IS" BASIS,
|
|
|
11 |
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
|
|
12 |
// See the License for the specific language governing permissions and
|
|
|
13 |
// limitations under the License.
|
|
|
14 |
//
|
|
|
15 |
// See accompanying file LICENSE or visit the Scribe site at:
|
|
|
16 |
// http://developers.facebook.com/scribe/
|
|
|
17 |
//
|
|
|
18 |
// @author Bobby Johnson
|
|
|
19 |
// @author James Wang
|
|
|
20 |
// @author Jason Sobel
|
|
|
21 |
// @author Anthony Giardullo
|
|
|
22 |
|
|
|
23 |
#include "common.h"
|
|
|
24 |
#include "scribe_server.h"
|
|
|
25 |
|
|
|
26 |
using namespace std;
|
|
|
27 |
using namespace boost;
|
|
|
28 |
using namespace scribe::thrift;
|
|
|
29 |
|
|
|
30 |
#define DEFAULT_TARGET_WRITE_SIZE 16384
|
|
|
31 |
#define DEFAULT_MAX_WRITE_INTERVAL 10
|
|
|
32 |
|
|
|
33 |
void* threadStatic(void *this_ptr) {
|
|
|
34 |
StoreQueue *queue_ptr = (StoreQueue*)this_ptr;
|
|
|
35 |
queue_ptr->threadMember();
|
|
|
36 |
return NULL;
|
|
|
37 |
}
|
|
|
38 |
|
|
|
39 |
StoreQueue::StoreQueue(const string& type, const string& category,
|
|
|
40 |
unsigned check_period, bool is_model, bool multi_category)
|
|
|
41 |
: msgQueueSize(0),
|
|
|
42 |
hasWork(false),
|
|
|
43 |
stopping(false),
|
|
|
44 |
isModel(is_model),
|
|
|
45 |
multiCategory(multi_category),
|
|
|
46 |
categoryHandled(category),
|
|
|
47 |
checkPeriod(check_period),
|
|
|
48 |
targetWriteSize(DEFAULT_TARGET_WRITE_SIZE),
|
|
|
49 |
maxWriteInterval(DEFAULT_MAX_WRITE_INTERVAL),
|
|
|
50 |
mustSucceed(true) {
|
|
|
51 |
|
|
|
52 |
store = Store::createStore(type, category, false, multiCategory);
|
|
|
53 |
if (!store) {
|
|
|
54 |
throw std::runtime_error("createStore failed in StoreQueue constructor. Invalid type?");
|
|
|
55 |
}
|
|
|
56 |
storeInitCommon();
|
|
|
57 |
}
|
|
|
58 |
|
|
|
59 |
StoreQueue::StoreQueue(const shared_ptr<StoreQueue> example,
|
|
|
60 |
const std::string &category)
|
|
|
61 |
: msgQueueSize(0),
|
|
|
62 |
hasWork(false),
|
|
|
63 |
stopping(false),
|
|
|
64 |
isModel(false),
|
|
|
65 |
multiCategory(example->multiCategory),
|
|
|
66 |
categoryHandled(category),
|
|
|
67 |
checkPeriod(example->checkPeriod),
|
|
|
68 |
targetWriteSize(example->targetWriteSize),
|
|
|
69 |
maxWriteInterval(example->maxWriteInterval),
|
|
|
70 |
mustSucceed(example->mustSucceed) {
|
|
|
71 |
|
|
|
72 |
store = example->copyStore(category);
|
|
|
73 |
if (!store) {
|
|
|
74 |
throw std::runtime_error("createStore failed copying model store");
|
|
|
75 |
}
|
|
|
76 |
storeInitCommon();
|
|
|
77 |
}
|
|
|
78 |
|
|
|
79 |
|
|
|
80 |
StoreQueue::~StoreQueue() {
|
|
|
81 |
if (!isModel) {
|
|
|
82 |
pthread_mutex_destroy(&cmdMutex);
|
|
|
83 |
pthread_mutex_destroy(&msgMutex);
|
|
|
84 |
pthread_mutex_destroy(&hasWorkMutex);
|
|
|
85 |
pthread_cond_destroy(&hasWorkCond);
|
|
|
86 |
}
|
|
|
87 |
}
|
|
|
88 |
|
|
|
89 |
// WARNING: the number could change after you check this, so don't
|
|
|
90 |
// expect it to be exact. Use for hueristics ONLY.
|
|
|
91 |
unsigned long StoreQueue::getSize() {
|
|
|
92 |
unsigned long retval;
|
|
|
93 |
pthread_mutex_lock(&msgMutex);
|
|
|
94 |
retval = msgQueueSize;
|
|
|
95 |
pthread_mutex_unlock(&msgMutex);
|
|
|
96 |
return retval;
|
|
|
97 |
}
|
|
|
98 |
|
|
|
99 |
void StoreQueue::addMessage(boost::shared_ptr<LogEntry> entry) {
|
|
|
100 |
if (isModel) {
|
|
|
101 |
LOG_OPER("ERROR: called addMessage on model store");
|
|
|
102 |
} else {
|
|
|
103 |
bool waitForWork = false;
|
|
|
104 |
|
|
|
105 |
pthread_mutex_lock(&msgMutex);
|
|
|
106 |
msgQueue->push_back(entry);
|
|
|
107 |
msgQueueSize += entry->message.size();
|
|
|
108 |
|
|
|
109 |
waitForWork = (msgQueueSize >= targetWriteSize) ? true : false;
|
|
|
110 |
pthread_mutex_unlock(&msgMutex);
|
|
|
111 |
|
|
|
112 |
// Wake up store thread if we have enough messages
|
|
|
113 |
if (waitForWork == true) {
|
|
|
114 |
// signal that there is work to do if not already signaled
|
|
|
115 |
pthread_mutex_lock(&hasWorkMutex);
|
|
|
116 |
if (!hasWork) {
|
|
|
117 |
hasWork = true;
|
|
|
118 |
pthread_cond_signal(&hasWorkCond);
|
|
|
119 |
}
|
|
|
120 |
pthread_mutex_unlock(&hasWorkMutex);
|
|
|
121 |
}
|
|
|
122 |
}
|
|
|
123 |
}
|
|
|
124 |
|
|
|
125 |
void StoreQueue::configureAndOpen(pStoreConf configuration) {
|
|
|
126 |
// model store has to handle this inline since it has no queue
|
|
|
127 |
if (isModel) {
|
|
|
128 |
configureInline(configuration);
|
|
|
129 |
} else {
|
|
|
130 |
pthread_mutex_lock(&cmdMutex);
|
|
|
131 |
StoreCommand cmd(CMD_CONFIGURE, configuration);
|
|
|
132 |
cmdQueue.push(cmd);
|
|
|
133 |
pthread_mutex_unlock(&cmdMutex);
|
|
|
134 |
|
|
|
135 |
// signal that there is work to do if not already signaled
|
|
|
136 |
pthread_mutex_lock(&hasWorkMutex);
|
|
|
137 |
if (!hasWork) {
|
|
|
138 |
hasWork = true;
|
|
|
139 |
pthread_cond_signal(&hasWorkCond);
|
|
|
140 |
}
|
|
|
141 |
pthread_mutex_unlock(&hasWorkMutex);
|
|
|
142 |
}
|
|
|
143 |
}
|
|
|
144 |
|
|
|
145 |
void StoreQueue::stop() {
|
|
|
146 |
if (isModel) {
|
|
|
147 |
LOG_OPER("ERROR: called stop() on model store");
|
|
|
148 |
} else if(!stopping) {
|
|
|
149 |
pthread_mutex_lock(&cmdMutex);
|
|
|
150 |
StoreCommand cmd(CMD_STOP);
|
|
|
151 |
cmdQueue.push(cmd);
|
|
|
152 |
stopping = true;
|
|
|
153 |
pthread_mutex_unlock(&cmdMutex);
|
|
|
154 |
|
|
|
155 |
// signal that there is work to do if not already signaled
|
|
|
156 |
pthread_mutex_lock(&hasWorkMutex);
|
|
|
157 |
if (!hasWork) {
|
|
|
158 |
hasWork = true;
|
|
|
159 |
pthread_cond_signal(&hasWorkCond);
|
|
|
160 |
}
|
|
|
161 |
pthread_mutex_unlock(&hasWorkMutex);
|
|
|
162 |
|
|
|
163 |
pthread_join(storeThread, NULL);
|
|
|
164 |
}
|
|
|
165 |
}
|
|
|
166 |
|
|
|
167 |
void StoreQueue::open() {
|
|
|
168 |
if (isModel) {
|
|
|
169 |
LOG_OPER("ERROR: called open() on model store");
|
|
|
170 |
} else {
|
|
|
171 |
pthread_mutex_lock(&cmdMutex);
|
|
|
172 |
StoreCommand cmd(CMD_OPEN);
|
|
|
173 |
cmdQueue.push(cmd);
|
|
|
174 |
pthread_mutex_unlock(&cmdMutex);
|
|
|
175 |
|
|
|
176 |
// signal that there is work to do if not already signaled
|
|
|
177 |
pthread_mutex_lock(&hasWorkMutex);
|
|
|
178 |
if (!hasWork) {
|
|
|
179 |
hasWork = true;
|
|
|
180 |
pthread_cond_signal(&hasWorkCond);
|
|
|
181 |
}
|
|
|
182 |
pthread_mutex_unlock(&hasWorkMutex);
|
|
|
183 |
}
|
|
|
184 |
}
|
|
|
185 |
|
|
|
186 |
shared_ptr<Store> StoreQueue::copyStore(const std::string &category) {
|
|
|
187 |
return store->copy(category);
|
|
|
188 |
}
|
|
|
189 |
|
|
|
190 |
std::string StoreQueue::getCategoryHandled() {
|
|
|
191 |
return categoryHandled;
|
|
|
192 |
}
|
|
|
193 |
|
|
|
194 |
|
|
|
195 |
std::string StoreQueue::getStatus() {
|
|
|
196 |
return store->getStatus();
|
|
|
197 |
}
|
|
|
198 |
|
|
|
199 |
std::string StoreQueue::getBaseType() {
|
|
|
200 |
return store->getType();
|
|
|
201 |
}
|
|
|
202 |
|
|
|
203 |
void StoreQueue::threadMember() {
|
|
|
204 |
LOG_OPER("store thread starting");
|
|
|
205 |
|
|
|
206 |
if (isModel) {
|
|
|
207 |
LOG_OPER("ERROR: store thread starting on model store, exiting");
|
|
|
208 |
return;
|
|
|
209 |
}
|
|
|
210 |
|
|
|
211 |
if (!store) {
|
|
|
212 |
LOG_OPER("store is NULL, store thread exiting");
|
|
|
213 |
return;
|
|
|
214 |
}
|
|
|
215 |
|
|
|
216 |
// init time of last periodic check to time of 0
|
|
|
217 |
time_t last_periodic_check = 0;
|
|
|
218 |
|
|
|
219 |
time_t last_handle_messages;
|
|
|
220 |
time(&last_handle_messages);
|
|
|
221 |
|
|
|
222 |
// initialize absolute timestamp
|
|
|
223 |
struct timespec abs_timeout;
|
|
|
224 |
memset(&abs_timeout, 0, sizeof(struct timespec));
|
|
|
225 |
|
|
|
226 |
bool stop = false;
|
|
|
227 |
bool open = false;
|
|
|
228 |
while (!stop) {
|
|
|
229 |
|
|
|
230 |
// handle commands
|
|
|
231 |
//
|
|
|
232 |
pthread_mutex_lock(&cmdMutex);
|
|
|
233 |
while (!cmdQueue.empty()) {
|
|
|
234 |
StoreCommand cmd = cmdQueue.front();
|
|
|
235 |
cmdQueue.pop();
|
|
|
236 |
|
|
|
237 |
switch (cmd.command) {
|
|
|
238 |
case CMD_CONFIGURE:
|
|
|
239 |
configureInline(cmd.configuration);
|
|
|
240 |
openInline();
|
|
|
241 |
open = true;
|
|
|
242 |
break;
|
|
|
243 |
case CMD_OPEN:
|
|
|
244 |
openInline();
|
|
|
245 |
open = true;
|
|
|
246 |
break;
|
|
|
247 |
case CMD_STOP:
|
|
|
248 |
stop = true;
|
|
|
249 |
break;
|
|
|
250 |
default:
|
|
|
251 |
LOG_OPER("LOGIC ERROR: unknown command to store queue");
|
|
|
252 |
break;
|
|
|
253 |
}
|
|
|
254 |
}
|
|
|
255 |
|
|
|
256 |
// handle periodic tasks
|
|
|
257 |
//
|
|
|
258 |
time_t this_loop;
|
|
|
259 |
time(&this_loop);
|
|
|
260 |
if (!stop && open && this_loop - last_periodic_check > checkPeriod) {
|
|
|
261 |
store->periodicCheck();
|
|
|
262 |
last_periodic_check = this_loop;
|
|
|
263 |
}
|
|
|
264 |
|
|
|
265 |
pthread_mutex_lock(&msgMutex);
|
|
|
266 |
pthread_mutex_unlock(&cmdMutex);
|
|
|
267 |
|
|
|
268 |
boost::shared_ptr<logentry_vector_t> messages;
|
|
|
269 |
|
|
|
270 |
// handle messages if stopping, enough time has passed, or queue is large
|
|
|
271 |
//
|
|
|
272 |
if (stop ||
|
|
|
273 |
(this_loop - last_handle_messages > maxWriteInterval) ||
|
|
|
274 |
msgQueueSize >= targetWriteSize) {
|
|
|
275 |
|
|
|
276 |
if (failedMessages) {
|
|
|
277 |
// process any messages we were not able to process last time
|
|
|
278 |
messages = failedMessages;
|
|
|
279 |
failedMessages = boost::shared_ptr<logentry_vector_t>();
|
|
|
280 |
} else if (msgQueueSize > 0) {
|
|
|
281 |
// process message in queue
|
|
|
282 |
messages = msgQueue;
|
|
|
283 |
msgQueue = boost::shared_ptr<logentry_vector_t>(new logentry_vector_t);
|
|
|
284 |
msgQueueSize = 0;
|
|
|
285 |
}
|
|
|
286 |
|
|
|
287 |
// reset timer
|
|
|
288 |
last_handle_messages = this_loop;
|
|
|
289 |
}
|
|
|
290 |
|
|
|
291 |
pthread_mutex_unlock(&msgMutex);
|
|
|
292 |
|
|
|
293 |
if (messages) {
|
|
|
294 |
if (!store->handleMessages(messages)) {
|
|
|
295 |
// Store could not handle these messages
|
|
|
296 |
processFailedMessages(messages);
|
|
|
297 |
}
|
|
|
298 |
store->flush();
|
|
|
299 |
}
|
|
|
300 |
|
|
|
301 |
if (!stop) {
|
|
|
302 |
// set timeout to when we need to handle messages or do a periodic check
|
|
|
303 |
abs_timeout.tv_sec = min(last_periodic_check + checkPeriod,
|
|
|
304 |
last_handle_messages + maxWriteInterval);
|
|
|
305 |
|
|
|
306 |
// must wait until after this time
|
|
|
307 |
abs_timeout.tv_sec++;
|
|
|
308 |
|
|
|
309 |
// wait until there's some work to do or we timeout
|
|
|
310 |
pthread_mutex_lock(&hasWorkMutex);
|
|
|
311 |
if (!hasWork) {
|
|
|
312 |
pthread_cond_timedwait(&hasWorkCond, &hasWorkMutex, &abs_timeout);
|
|
|
313 |
}
|
|
|
314 |
hasWork = false;
|
|
|
315 |
pthread_mutex_unlock(&hasWorkMutex);
|
|
|
316 |
}
|
|
|
317 |
|
|
|
318 |
} // while (!stop)
|
|
|
319 |
|
|
|
320 |
store->close();
|
|
|
321 |
}
|
|
|
322 |
|
|
|
323 |
void StoreQueue::processFailedMessages(shared_ptr<logentry_vector_t> messages) {
|
|
|
324 |
// If the store was not able to process these messages, we will either
|
|
|
325 |
// requeue them or give up depending on the value of mustSucceed
|
|
|
326 |
|
|
|
327 |
if (mustSucceed) {
|
|
|
328 |
// Save failed messages
|
|
|
329 |
failedMessages = messages;
|
|
|
330 |
|
|
|
331 |
LOG_OPER("[%s] WARNING: Re-queueing %lu messages!",
|
|
|
332 |
categoryHandled.c_str(), messages->size());
|
|
|
333 |
g_Handler->incrementCounter("requeue", messages->size());
|
|
|
334 |
} else {
|
|
|
335 |
// record messages as being lost
|
|
|
336 |
LOG_OPER("[%s] WARNING: Lost %lu messages!",
|
|
|
337 |
categoryHandled.c_str(), messages->size());
|
|
|
338 |
g_Handler->incrementCounter("lost", messages->size());
|
|
|
339 |
}
|
|
|
340 |
}
|
|
|
341 |
|
|
|
342 |
void StoreQueue::storeInitCommon() {
|
|
|
343 |
// model store doesn't need this stuff
|
|
|
344 |
if (!isModel) {
|
|
|
345 |
msgQueue = boost::shared_ptr<logentry_vector_t>(new logentry_vector_t);
|
|
|
346 |
pthread_mutex_init(&cmdMutex, NULL);
|
|
|
347 |
pthread_mutex_init(&msgMutex, NULL);
|
|
|
348 |
pthread_mutex_init(&hasWorkMutex, NULL);
|
|
|
349 |
pthread_cond_init(&hasWorkCond, NULL);
|
|
|
350 |
|
|
|
351 |
pthread_create(&storeThread, NULL, threadStatic, (void*) this);
|
|
|
352 |
}
|
|
|
353 |
}
|
|
|
354 |
|
|
|
355 |
void StoreQueue::configureInline(pStoreConf configuration) {
|
|
|
356 |
// Constructor defaults are fine if these don't exist
|
|
|
357 |
configuration->getUnsigned("target_write_size", (unsigned long&) targetWriteSize);
|
|
|
358 |
configuration->getUnsigned("max_write_interval", (unsigned long&) maxWriteInterval);
|
|
|
359 |
|
|
|
360 |
string tmp;
|
|
|
361 |
if (configuration->getString("must_succeed", tmp) && tmp == "no") {
|
|
|
362 |
mustSucceed = false;
|
|
|
363 |
}
|
|
|
364 |
|
|
|
365 |
store->configure(configuration);
|
|
|
366 |
}
|
|
|
367 |
|
|
|
368 |
void StoreQueue::openInline() {
|
|
|
369 |
if (store->isOpen()) {
|
|
|
370 |
store->close();
|
|
|
371 |
}
|
|
|
372 |
if (!isModel) {
|
|
|
373 |
store->open();
|
|
|
374 |
}
|
|
|
375 |
}
|