Rev 301 | Blame | Compare with Previous | Last modification | View Log | RSS feed
// Copyright (c) 2007-2008 Facebook//// Licensed under the Apache License, Version 2.0 (the "License");// you may not use this file except in compliance with the License.// You may obtain a copy of the License at//// http://www.apache.org/licenses/LICENSE-2.0//// Unless required by applicable law or agreed to in writing, software// distributed under the License is distributed on an "AS IS" BASIS,// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.// See the License for the specific language governing permissions and// limitations under the License.//// See accompanying file LICENSE or visit the Scribe site at:// http://developers.facebook.com/scribe///// @author Bobby Johnson// @author James Wang// @author Jason Sobel// @author Anthony Giardullo#include "common.h"#include "scribe_server.h"using namespace std;using namespace boost;using namespace scribe::thrift;#define DEFAULT_TARGET_WRITE_SIZE 16384#define DEFAULT_MAX_WRITE_INTERVAL 10void* threadStatic(void *this_ptr) {StoreQueue *queue_ptr = (StoreQueue*)this_ptr;queue_ptr->threadMember();return NULL;}StoreQueue::StoreQueue(const string& type, const string& category,unsigned check_period, bool is_model, bool multi_category): msgQueueSize(0),hasWork(false),stopping(false),isModel(is_model),multiCategory(multi_category),categoryHandled(category),checkPeriod(check_period),targetWriteSize(DEFAULT_TARGET_WRITE_SIZE),maxWriteInterval(DEFAULT_MAX_WRITE_INTERVAL),mustSucceed(true) {store = Store::createStore(type, category, false, multiCategory);if (!store) {throw std::runtime_error("createStore failed in StoreQueue constructor. Invalid type?");}storeInitCommon();}StoreQueue::StoreQueue(const shared_ptr<StoreQueue> example,const std::string &category): msgQueueSize(0),hasWork(false),stopping(false),isModel(false),multiCategory(example->multiCategory),categoryHandled(category),checkPeriod(example->checkPeriod),targetWriteSize(example->targetWriteSize),maxWriteInterval(example->maxWriteInterval),mustSucceed(example->mustSucceed) {store = example->copyStore(category);if (!store) {throw std::runtime_error("createStore failed copying model store");}storeInitCommon();}StoreQueue::~StoreQueue() {if (!isModel) {pthread_mutex_destroy(&cmdMutex);pthread_mutex_destroy(&msgMutex);pthread_mutex_destroy(&hasWorkMutex);pthread_cond_destroy(&hasWorkCond);}}// WARNING: the number could change after you check this, so don't// expect it to be exact. Use for hueristics ONLY.unsigned long StoreQueue::getSize() {unsigned long retval;pthread_mutex_lock(&msgMutex);retval = msgQueueSize;pthread_mutex_unlock(&msgMutex);return retval;}void StoreQueue::addMessage(boost::shared_ptr<LogEntry> entry) {if (isModel) {LOG_OPER("ERROR: called addMessage on model store");} else {bool waitForWork = false;pthread_mutex_lock(&msgMutex);msgQueue->push_back(entry);msgQueueSize += entry->message.size();waitForWork = (msgQueueSize >= targetWriteSize) ? true : false;pthread_mutex_unlock(&msgMutex);// Wake up store thread if we have enough messagesif (waitForWork == true) {// signal that there is work to do if not already signaledpthread_mutex_lock(&hasWorkMutex);if (!hasWork) {hasWork = true;pthread_cond_signal(&hasWorkCond);}pthread_mutex_unlock(&hasWorkMutex);}}}void StoreQueue::configureAndOpen(pStoreConf configuration) {// model store has to handle this inline since it has no queueif (isModel) {configureInline(configuration);} else {pthread_mutex_lock(&cmdMutex);StoreCommand cmd(CMD_CONFIGURE, configuration);cmdQueue.push(cmd);pthread_mutex_unlock(&cmdMutex);// signal that there is work to do if not already signaledpthread_mutex_lock(&hasWorkMutex);if (!hasWork) {hasWork = true;pthread_cond_signal(&hasWorkCond);}pthread_mutex_unlock(&hasWorkMutex);}}void StoreQueue::stop() {if (isModel) {LOG_OPER("ERROR: called stop() on model store");} else if(!stopping) {pthread_mutex_lock(&cmdMutex);StoreCommand cmd(CMD_STOP);cmdQueue.push(cmd);stopping = true;pthread_mutex_unlock(&cmdMutex);// signal that there is work to do if not already signaledpthread_mutex_lock(&hasWorkMutex);if (!hasWork) {hasWork = true;pthread_cond_signal(&hasWorkCond);}pthread_mutex_unlock(&hasWorkMutex);pthread_join(storeThread, NULL);}}void StoreQueue::open() {if (isModel) {LOG_OPER("ERROR: called open() on model store");} else {pthread_mutex_lock(&cmdMutex);StoreCommand cmd(CMD_OPEN);cmdQueue.push(cmd);pthread_mutex_unlock(&cmdMutex);// signal that there is work to do if not already signaledpthread_mutex_lock(&hasWorkMutex);if (!hasWork) {hasWork = true;pthread_cond_signal(&hasWorkCond);}pthread_mutex_unlock(&hasWorkMutex);}}shared_ptr<Store> StoreQueue::copyStore(const std::string &category) {return store->copy(category);}std::string StoreQueue::getCategoryHandled() {return categoryHandled;}std::string StoreQueue::getStatus() {return store->getStatus();}std::string StoreQueue::getBaseType() {return store->getType();}void StoreQueue::threadMember() {LOG_OPER("store thread starting");if (isModel) {LOG_OPER("ERROR: store thread starting on model store, exiting");return;}if (!store) {LOG_OPER("store is NULL, store thread exiting");return;}// init time of last periodic check to time of 0time_t last_periodic_check = 0;time_t last_handle_messages;time(&last_handle_messages);// initialize absolute timestampstruct timespec abs_timeout;memset(&abs_timeout, 0, sizeof(struct timespec));bool stop = false;bool open = false;while (!stop) {// handle commands//pthread_mutex_lock(&cmdMutex);while (!cmdQueue.empty()) {StoreCommand cmd = cmdQueue.front();cmdQueue.pop();switch (cmd.command) {case CMD_CONFIGURE:configureInline(cmd.configuration);openInline();open = true;break;case CMD_OPEN:openInline();open = true;break;case CMD_STOP:stop = true;break;default:LOG_OPER("LOGIC ERROR: unknown command to store queue");break;}}// handle periodic tasks//time_t this_loop;time(&this_loop);if (!stop && open && this_loop - last_periodic_check > checkPeriod) {store->periodicCheck();last_periodic_check = this_loop;}pthread_mutex_lock(&msgMutex);pthread_mutex_unlock(&cmdMutex);boost::shared_ptr<logentry_vector_t> messages;// handle messages if stopping, enough time has passed, or queue is large//if (stop ||(this_loop - last_handle_messages > maxWriteInterval) ||msgQueueSize >= targetWriteSize) {if (failedMessages) {// process any messages we were not able to process last timemessages = failedMessages;failedMessages = boost::shared_ptr<logentry_vector_t>();} else if (msgQueueSize > 0) {// process message in queuemessages = msgQueue;msgQueue = boost::shared_ptr<logentry_vector_t>(new logentry_vector_t);msgQueueSize = 0;}// reset timerlast_handle_messages = this_loop;}pthread_mutex_unlock(&msgMutex);if (messages) {if (!store->handleMessages(messages)) {// Store could not handle these messagesprocessFailedMessages(messages);}store->flush();}if (!stop) {// set timeout to when we need to handle messages or do a periodic checkabs_timeout.tv_sec = min(last_periodic_check + checkPeriod,last_handle_messages + maxWriteInterval);// must wait until after this timeabs_timeout.tv_sec++;// wait until there's some work to do or we timeoutpthread_mutex_lock(&hasWorkMutex);if (!hasWork) {pthread_cond_timedwait(&hasWorkCond, &hasWorkMutex, &abs_timeout);}hasWork = false;pthread_mutex_unlock(&hasWorkMutex);}} // while (!stop)store->close();}void StoreQueue::processFailedMessages(shared_ptr<logentry_vector_t> messages) {// If the store was not able to process these messages, we will either// requeue them or give up depending on the value of mustSucceedif (mustSucceed) {// Save failed messagesfailedMessages = messages;LOG_OPER("[%s] WARNING: Re-queueing %lu messages!",categoryHandled.c_str(), messages->size());g_Handler->incrementCounter("requeue", messages->size());} else {// record messages as being lostLOG_OPER("[%s] WARNING: Lost %lu messages!",categoryHandled.c_str(), messages->size());g_Handler->incrementCounter("lost", messages->size());}}void StoreQueue::storeInitCommon() {// model store doesn't need this stuffif (!isModel) {msgQueue = boost::shared_ptr<logentry_vector_t>(new logentry_vector_t);pthread_mutex_init(&cmdMutex, NULL);pthread_mutex_init(&msgMutex, NULL);pthread_mutex_init(&hasWorkMutex, NULL);pthread_cond_init(&hasWorkCond, NULL);pthread_create(&storeThread, NULL, threadStatic, (void*) this);}}void StoreQueue::configureInline(pStoreConf configuration) {// Constructor defaults are fine if these don't existconfiguration->getUnsigned("target_write_size", (unsigned long&) targetWriteSize);configuration->getUnsigned("max_write_interval", (unsigned long&) maxWriteInterval);string tmp;if (configuration->getString("must_succeed", tmp) && tmp == "no") {mustSucceed = false;}store->configure(configuration);}void StoreQueue::openInline() {if (store->isOpen()) {store->close();}if (!isModel) {store->open();}}