Subversion Repositories SmartDukaan

Rev

Details | Last modification | View Log | RSS feed

Rev Author Line No. Line
301 ashish 1
//  Copyright (c) 2007-2008 Facebook
2
//
3
//  Licensed under the Apache License, Version 2.0 (the "License");
4
//  you may not use this file except in compliance with the License.
5
//  You may obtain a copy of the License at
6
//
7
//      http://www.apache.org/licenses/LICENSE-2.0
8
//
9
//  Unless required by applicable law or agreed to in writing, software
10
//  distributed under the License is distributed on an "AS IS" BASIS,
11
//  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12
//  See the License for the specific language governing permissions and
13
//  limitations under the License.
14
//
15
// See accompanying file LICENSE or visit the Scribe site at:
16
// http://developers.facebook.com/scribe/
17
//
18
// @author Bobby Johnson
19
// @author James Wang
20
// @author Jason Sobel
21
// @author Anthony Giardullo
22
 
23
#include "common.h"
24
#include "scribe_server.h"
25
 
26
using namespace std;
27
using namespace boost;
28
using namespace scribe::thrift;
29
 
30
#define DEFAULT_TARGET_WRITE_SIZE  16384
31
#define DEFAULT_MAX_WRITE_INTERVAL 10
32
 
33
void* threadStatic(void *this_ptr) {
34
  StoreQueue *queue_ptr = (StoreQueue*)this_ptr;
35
  queue_ptr->threadMember();
36
  return NULL;
37
}
38
 
39
StoreQueue::StoreQueue(const string& type, const string& category,
40
                       unsigned check_period, bool is_model, bool multi_category)
41
  : msgQueueSize(0),
42
    hasWork(false),
43
    stopping(false),
44
    isModel(is_model),
45
    multiCategory(multi_category),
46
    categoryHandled(category),
47
    checkPeriod(check_period),
48
    targetWriteSize(DEFAULT_TARGET_WRITE_SIZE),
49
    maxWriteInterval(DEFAULT_MAX_WRITE_INTERVAL),
50
    mustSucceed(true) {
51
 
52
  store = Store::createStore(type, category, false, multiCategory);
53
  if (!store) {
54
    throw std::runtime_error("createStore failed in StoreQueue constructor. Invalid type?");
55
  }
56
  storeInitCommon();
57
}
58
 
59
StoreQueue::StoreQueue(const shared_ptr<StoreQueue> example,
60
                       const std::string &category)
61
  : msgQueueSize(0),
62
    hasWork(false),
63
    stopping(false),
64
    isModel(false),
65
    multiCategory(example->multiCategory),
66
    categoryHandled(category),
67
    checkPeriod(example->checkPeriod),
68
    targetWriteSize(example->targetWriteSize),
69
    maxWriteInterval(example->maxWriteInterval),
70
    mustSucceed(example->mustSucceed) {
71
 
72
  store = example->copyStore(category);
73
  if (!store) {
74
    throw std::runtime_error("createStore failed copying model store");
75
  }
76
  storeInitCommon();
77
}
78
 
79
 
80
StoreQueue::~StoreQueue() {
81
  if (!isModel) {
82
    pthread_mutex_destroy(&cmdMutex);
83
    pthread_mutex_destroy(&msgMutex);
84
    pthread_mutex_destroy(&hasWorkMutex);
85
    pthread_cond_destroy(&hasWorkCond);
86
  }
87
}
88
 
89
// WARNING: the number could change after you check this, so don't
90
// expect it to be exact. Use for hueristics ONLY.
91
unsigned long StoreQueue::getSize() {
92
  unsigned long retval;
93
  pthread_mutex_lock(&msgMutex);
94
  retval = msgQueueSize;
95
  pthread_mutex_unlock(&msgMutex);
96
  return retval;
97
}
98
 
99
void StoreQueue::addMessage(boost::shared_ptr<LogEntry> entry) {
100
  if (isModel) {
101
    LOG_OPER("ERROR: called addMessage on model store");
102
  } else {
103
    bool waitForWork = false;
104
 
105
    pthread_mutex_lock(&msgMutex);
106
    msgQueue->push_back(entry);
107
    msgQueueSize += entry->message.size();
108
 
109
    waitForWork = (msgQueueSize >= targetWriteSize) ? true : false;
110
    pthread_mutex_unlock(&msgMutex);
111
 
112
    // Wake up store thread if we have enough messages
113
    if (waitForWork == true) {
114
      // signal that there is work to do if not already signaled
115
      pthread_mutex_lock(&hasWorkMutex);
116
      if (!hasWork) {
117
        hasWork = true;
118
        pthread_cond_signal(&hasWorkCond);
119
      }
120
      pthread_mutex_unlock(&hasWorkMutex);
121
    }
122
  }
123
}
124
 
125
void StoreQueue::configureAndOpen(pStoreConf configuration) {
126
  // model store has to handle this inline since it has no queue
127
  if (isModel) {
128
    configureInline(configuration);
129
  } else {
130
    pthread_mutex_lock(&cmdMutex);
131
    StoreCommand cmd(CMD_CONFIGURE, configuration);
132
    cmdQueue.push(cmd);
133
    pthread_mutex_unlock(&cmdMutex);
134
 
135
    // signal that there is work to do if not already signaled
136
    pthread_mutex_lock(&hasWorkMutex);
137
    if (!hasWork) {
138
      hasWork = true;
139
      pthread_cond_signal(&hasWorkCond);
140
    }
141
    pthread_mutex_unlock(&hasWorkMutex);
142
  }
143
}
144
 
145
void StoreQueue::stop() {
146
  if (isModel) {
147
    LOG_OPER("ERROR: called stop() on model store");
148
  } else if(!stopping) {
149
    pthread_mutex_lock(&cmdMutex);
150
    StoreCommand cmd(CMD_STOP);
151
    cmdQueue.push(cmd);
152
    stopping = true;
153
    pthread_mutex_unlock(&cmdMutex);
154
 
155
    // signal that there is work to do if not already signaled
156
    pthread_mutex_lock(&hasWorkMutex);
157
    if (!hasWork) {
158
      hasWork = true;
159
      pthread_cond_signal(&hasWorkCond);
160
    }
161
    pthread_mutex_unlock(&hasWorkMutex);
162
 
163
    pthread_join(storeThread, NULL);
164
  }
165
}
166
 
167
void StoreQueue::open() {
168
  if (isModel) {
169
    LOG_OPER("ERROR: called open() on model store");
170
  } else {
171
    pthread_mutex_lock(&cmdMutex);
172
    StoreCommand cmd(CMD_OPEN);
173
    cmdQueue.push(cmd);
174
    pthread_mutex_unlock(&cmdMutex);
175
 
176
    // signal that there is work to do if not already signaled
177
    pthread_mutex_lock(&hasWorkMutex);
178
    if (!hasWork) {
179
      hasWork = true;
180
      pthread_cond_signal(&hasWorkCond);
181
    }
182
    pthread_mutex_unlock(&hasWorkMutex);
183
  }
184
}
185
 
186
shared_ptr<Store> StoreQueue::copyStore(const std::string &category) {
187
  return store->copy(category);
188
}
189
 
190
std::string StoreQueue::getCategoryHandled() {
191
  return categoryHandled;
192
}
193
 
194
 
195
std::string StoreQueue::getStatus() {
196
  return store->getStatus();
197
}
198
 
199
std::string StoreQueue::getBaseType() {
200
  return store->getType();
201
}
202
 
203
void StoreQueue::threadMember() {
204
  LOG_OPER("store thread starting");
205
 
206
  if (isModel) {
207
    LOG_OPER("ERROR: store thread starting on model store, exiting");
208
    return;
209
  }
210
 
211
  if (!store) {
212
    LOG_OPER("store is NULL, store thread exiting");
213
    return;
214
  }
215
 
216
  // init time of last periodic check to time of 0
217
  time_t last_periodic_check = 0;
218
 
219
  time_t last_handle_messages;
220
  time(&last_handle_messages);
221
 
222
  // initialize absolute timestamp
223
  struct timespec abs_timeout;
224
  memset(&abs_timeout, 0, sizeof(struct timespec));
225
 
226
  bool stop = false;
227
  bool open = false;
228
  while (!stop) {
229
 
230
    // handle commands
231
    //
232
    pthread_mutex_lock(&cmdMutex);
233
    while (!cmdQueue.empty()) {
234
      StoreCommand cmd = cmdQueue.front();
235
      cmdQueue.pop();
236
 
237
      switch (cmd.command) {
238
      case CMD_CONFIGURE:
239
        configureInline(cmd.configuration);
240
        openInline();
241
        open = true;
242
        break;
243
      case CMD_OPEN:
244
        openInline();
245
        open = true;
246
        break;
247
      case CMD_STOP:
248
        stop = true;
249
        break;
250
      default:
251
        LOG_OPER("LOGIC ERROR: unknown command to store queue");
252
        break;
253
      }
254
    }
255
 
256
    // handle periodic tasks
257
    //
258
    time_t this_loop;
259
    time(&this_loop);
260
    if (!stop && open && this_loop - last_periodic_check > checkPeriod) {
261
      store->periodicCheck();
262
      last_periodic_check = this_loop;
263
    }
264
 
265
    pthread_mutex_lock(&msgMutex);
266
    pthread_mutex_unlock(&cmdMutex);
267
 
268
    boost::shared_ptr<logentry_vector_t> messages;
269
 
270
    // handle messages if stopping, enough time has passed, or queue is large
271
    //
272
    if (stop ||
273
        (this_loop - last_handle_messages > maxWriteInterval) ||
274
        msgQueueSize >= targetWriteSize) {
275
 
276
      if (failedMessages) {
277
        // process any messages we were not able to process last time
278
        messages = failedMessages;
279
        failedMessages = boost::shared_ptr<logentry_vector_t>();
280
      } else if (msgQueueSize > 0) {
281
        // process message in queue
282
        messages = msgQueue;
283
        msgQueue = boost::shared_ptr<logentry_vector_t>(new logentry_vector_t);
284
        msgQueueSize = 0;
285
      }
286
 
287
      // reset timer
288
      last_handle_messages = this_loop;
289
    }
290
 
291
    pthread_mutex_unlock(&msgMutex);
292
 
293
    if (messages) {
294
      if (!store->handleMessages(messages)) {
295
        // Store could not handle these messages
296
        processFailedMessages(messages);
297
      }
298
      store->flush();
299
    }
300
 
301
    if (!stop) {
302
      // set timeout to when we need to handle messages or do a periodic check
303
      abs_timeout.tv_sec = min(last_periodic_check + checkPeriod,
304
                               last_handle_messages + maxWriteInterval);
305
 
306
      // must wait until after this time
307
      abs_timeout.tv_sec++;
308
 
309
      // wait until there's some work to do or we timeout
310
      pthread_mutex_lock(&hasWorkMutex);
311
      if (!hasWork) {
312
	pthread_cond_timedwait(&hasWorkCond, &hasWorkMutex, &abs_timeout);
313
      }
314
      hasWork = false;
315
      pthread_mutex_unlock(&hasWorkMutex);
316
    }
317
 
318
  } // while (!stop)
319
 
320
  store->close();
321
}
322
 
323
void StoreQueue::processFailedMessages(shared_ptr<logentry_vector_t> messages) {
324
  // If the store was not able to process these messages, we will either
325
  // requeue them or give up depending on the value of mustSucceed
326
 
327
  if (mustSucceed) {
328
    // Save failed messages
329
    failedMessages = messages;
330
 
331
    LOG_OPER("[%s] WARNING: Re-queueing %lu messages!",
332
             categoryHandled.c_str(), messages->size());
333
    g_Handler->incrementCounter("requeue", messages->size());
334
  } else {
335
    // record messages as being lost
336
    LOG_OPER("[%s] WARNING: Lost %lu messages!",
337
             categoryHandled.c_str(), messages->size());
338
    g_Handler->incrementCounter("lost", messages->size());
339
  }
340
}
341
 
342
void StoreQueue::storeInitCommon() {
343
  // model store doesn't need this stuff
344
  if (!isModel) {
345
    msgQueue = boost::shared_ptr<logentry_vector_t>(new logentry_vector_t);
346
    pthread_mutex_init(&cmdMutex, NULL);
347
    pthread_mutex_init(&msgMutex, NULL);
348
    pthread_mutex_init(&hasWorkMutex, NULL);
349
    pthread_cond_init(&hasWorkCond, NULL);
350
 
351
    pthread_create(&storeThread, NULL, threadStatic, (void*) this);
352
  }
353
}
354
 
355
void StoreQueue::configureInline(pStoreConf configuration) {
356
  // Constructor defaults are fine if these don't exist
357
  configuration->getUnsigned("target_write_size", (unsigned long&) targetWriteSize);
358
  configuration->getUnsigned("max_write_interval", (unsigned long&) maxWriteInterval);
359
 
360
  string tmp;
361
  if (configuration->getString("must_succeed", tmp) && tmp == "no") {
362
    mustSucceed = false;
363
  }
364
 
365
  store->configure(configuration);
366
}
367
 
368
void StoreQueue::openInline() {
369
  if (store->isOpen()) {
370
    store->close();
371
  }
372
  if (!isModel) {
373
    store->open();
374
  }
375
}