Rev 30 | Blame | Compare with Previous | Last modification | View Log | RSS feed
/** Licensed to the Apache Software Foundation (ASF) under one* or more contributor license agreements. See the NOTICE file* distributed with this work for additional information* regarding copyright ownership. The ASF licenses this file* to you under the Apache License, Version 2.0 (the* "License"); you may not use this file except in compliance* with the License. You may obtain a copy of the License at** http://www.apache.org/licenses/LICENSE-2.0** Unless required by applicable law or agreed to in writing,* software distributed under the License is distributed on an* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY* KIND, either express or implied. See the License for the* specific language governing permissions and limitations* under the License.*/#include <sys/time.h>#include "FacebookBase.h"#include "ServiceTracker.h"#include "concurrency/ThreadManager.h"using namespace std;using namespace facebook::fb303;using namespace apache::thrift::concurrency;uint64_t ServiceTracker::CHECKPOINT_MINIMUM_INTERVAL_SECONDS = 60;int ServiceTracker::LOG_LEVEL = 5;ServiceTracker::ServiceTracker(facebook::fb303::FacebookBase *handler,void (*logMethod)(int, const string &),bool featureCheckpoint,bool featureStatusCheck,bool featureThreadCheck,Stopwatch::Unit stopwatchUnit): handler_(handler), logMethod_(logMethod),featureCheckpoint_(featureCheckpoint),featureStatusCheck_(featureStatusCheck),featureThreadCheck_(featureThreadCheck),stopwatchUnit_(stopwatchUnit),checkpointServices_(0){if (featureCheckpoint_) {time_t now = time(NULL);checkpointTime_ = now;} else {checkpointTime_ = 0;}}/*** Registers the beginning of a "service method": basically, any of* the implementations of Thrift remote procedure calls that a* FacebookBase handler is handling. Controls concurrent* services and reports statistics (via log and via fb303 counters).* Throws an exception if the server is not ready to handle service* methods yet.** note: The relationship between startService() and finishService()* is currently defined so that a call to finishService() should only* be matched to this call to startService() if this method returns* without exception. It wouldn't be a problem to implement things* the other way, so that *every* start needed a finish, but this* convention was chosen to match the way an object's constructor and* destructor work together, i.e. to work well with ServiceMethod* objects.** @param const ServiceMethod &serviceMethod A reference to the ServiceMethod* object instantiated at the start* of the service method.*/voidServiceTracker::startService(const ServiceMethod &serviceMethod){// note: serviceMethod.timer_ automatically starts at construction.// log service startlogMethod_(5, serviceMethod.signature_);// check handler readyif (featureStatusCheck_ && !serviceMethod.featureLogOnly_) {// note: Throwing exceptions before counting statistics. See note// in method header.// note: A STOPPING server is not accepting new connections, but it// is still handling any already-connected threads -- so from the// service method's point of view, a status of STOPPING is a green// light.facebook::fb303::fb_status status = handler_->getStatus();if (status != facebook::fb303::ALIVE&& status != facebook::fb303::STOPPING) {if (status == facebook::fb303::STARTING) {throw ServiceException("Server starting up; please try again later");} else {throw ServiceException("Server not alive; please try again later");}}}// check server threadsif (featureThreadCheck_ && !serviceMethod.featureLogOnly_) {// note: Might want to put these messages in reportCheckpoint() if// log is getting spammed.if (threadManager_ != NULL) {size_t idle_count = threadManager_->idleWorkerCount();if (idle_count == 0) {stringstream message;message << "service " << serviceMethod.signature_<< ": all threads (" << threadManager_->workerCount()<< ") in use";logMethod_(3, message.str());}}}}/*** Logs a significant step in the middle of a "service method"; see* startService.** @param const ServiceMethod &serviceMethod A reference to the ServiceMethod* object instantiated at the start* of the service method.* @return int64_t Elapsed units (see stopwatchUnit_) since ServiceMethod* instantiation.*/int64_tServiceTracker::stepService(const ServiceMethod &serviceMethod,const string &stepName){stringstream message;string elapsed_label;int64_t elapsed = serviceMethod.timer_.elapsedUnits(stopwatchUnit_,&elapsed_label);message << serviceMethod.signature_<< ' ' << stepName<< " [" << elapsed_label << ']';logMethod_(5, message.str());return elapsed;}/*** Registers the end of a "service method"; see startService().** @param const ServiceMethod &serviceMethod A reference to the ServiceMethod* object instantiated at the start* of the service method.*/voidServiceTracker::finishService(const ServiceMethod &serviceMethod){// log end of servicestringstream message;string duration_label;int64_t duration = serviceMethod.timer_.elapsedUnits(stopwatchUnit_,&duration_label);message << serviceMethod.signature_<< " finish [" << duration_label << ']';logMethod_(5, message.str());// count, record, and maybe report service statisticsif (!serviceMethod.featureLogOnly_) {if (!featureCheckpoint_) {// lifetime counters// (note: No need to lock statisticsMutex_ if not doing checkpoint;// FacebookService::incrementCounter() is already thread-safe.)handler_->incrementCounter("lifetime_services");} else {statisticsMutex_.lock();// note: No exceptions expected from this code block. Wrap in a try// just to be safe.try {// lifetime counters// note: Good to synchronize this with the increment of// checkpoint services, even though incrementCounter() is// already thread-safe, for the sake of checkpoint reporting// consistency (i.e. since the last checkpoint,// lifetime_services has incremented by checkpointServices_).handler_->incrementCounter("lifetime_services");// checkpoint counterscheckpointServices_++;checkpointDuration_ += duration;// per-service timing// note kjv: According to my tests it is very slightly faster to// call insert() once (and detect not-found) than calling find()// and then maybe insert (if not-found). However, the difference// is tiny for small maps like this one, and the code for the// faster solution is slightly less readable. Also, I wonder if// the instantiation of the (often unused) pair to insert makes// the first algorithm slower after all.map<string, pair<uint64_t, uint64_t> >::iterator iter;iter = checkpointServiceDuration_.find(serviceMethod.name_);if (iter != checkpointServiceDuration_.end()) {iter->second.first++;iter->second.second += duration;} else {checkpointServiceDuration_.insert(make_pair(serviceMethod.name_,make_pair(1, duration)));}// maybe report checkpoint// note: ...if it's been long enough since the last report.time_t now = time(NULL);uint64_t check_interval = now - checkpointTime_;if (check_interval >= CHECKPOINT_MINIMUM_INTERVAL_SECONDS) {reportCheckpoint();}} catch (...) {statisticsMutex_.unlock();throw;}statisticsMutex_.unlock();}}}/*** Logs some statistics gathered since the last call to this method.** note: Thread race conditions on this method could cause* misreporting and/or undefined behavior; the caller must protect* uses of the object variables (and calls to this method) with a* mutex.**/voidServiceTracker::reportCheckpoint(){time_t now = time(NULL);uint64_t check_count = checkpointServices_;uint64_t check_interval = now - checkpointTime_;uint64_t check_duration = checkpointDuration_;// export counters for timing of service methods (by service name)handler_->setCounter("checkpoint_time", check_interval);map<string, pair<uint64_t, uint64_t> >::iterator iter;uint64_t count;for (iter = checkpointServiceDuration_.begin();iter != checkpointServiceDuration_.end();iter++) {count = iter->second.first;handler_->setCounter(string("checkpoint_count_") + iter->first, count);if (count == 0) {handler_->setCounter(string("checkpoint_speed_") + iter->first,0);} else {handler_->setCounter(string("checkpoint_speed_") + iter->first,iter->second.second / count);}}// reset checkpoint variables// note: Clearing the map while other threads are using it might// cause undefined behavior.checkpointServiceDuration_.clear();checkpointTime_ = now;checkpointServices_ = 0;checkpointDuration_ = 0;// get lifetime variablesuint64_t life_count = handler_->getCounter("lifetime_services");uint64_t life_interval = now - handler_->aliveSince();// log checkpointstringstream message;message << "checkpoint_time:" << check_interval<< " checkpoint_services:" << check_count<< " checkpoint_speed_sum:" << check_duration<< " lifetime_time:" << life_interval<< " lifetime_services:" << life_count;if (featureThreadCheck_ && threadManager_ != NULL) {size_t worker_count = threadManager_->workerCount();size_t idle_count = threadManager_->idleWorkerCount();message << " total_workers:" << worker_count<< " active_workers:" << (worker_count - idle_count);}logMethod_(4, message.str());}/*** Remembers the thread manager used in the server, for monitoring thread* activity.** @param shared_ptr<ThreadManager> threadManager The server's thread manager.*/voidServiceTracker::setThreadManager(boost::shared_ptr<ThreadManager>threadManager){threadManager_ = threadManager;}/*** Logs messages to stdout; the passed message will be logged if the* passed level is less than or equal to LOG_LEVEL.** This is the default logging method used by the ServiceTracker. An* alternate logging method (that accepts the same parameters) may be* specified to the constructor.** @param int level A level associated with the message: higher levels* are used to indicate higher levels of detail.* @param string message The message to log.*/voidServiceTracker::defaultLogMethod(int level, const string &message){if (level <= LOG_LEVEL) {string level_string;time_t now = time(NULL);char now_pretty[26];ctime_r(&now, now_pretty);now_pretty[24] = '\0';switch (level) {case 1:level_string = "CRITICAL";break;case 2:level_string = "ERROR";break;case 3:level_string = "WARNING";break;case 5:level_string = "DEBUG";break;case 4:default:level_string = "INFO";break;}cout << '[' << level_string << "] [" << now_pretty << "] "<< message << endl;}}/*** Creates a Stopwatch, which can report the time elapsed since its* creation.**/Stopwatch::Stopwatch(){gettimeofday(&startTime_, NULL);}voidStopwatch::reset(){gettimeofday(&startTime_, NULL);}uint64_tStopwatch::elapsedUnits(Stopwatch::Unit unit, string *label) const{timeval now_time;gettimeofday(&now_time, NULL);time_t duration_secs = now_time.tv_sec - startTime_.tv_sec;uint64_t duration_units;switch (unit) {case UNIT_SECONDS:duration_units = duration_secs+ (now_time.tv_usec - startTime_.tv_usec + 500000) / 1000000;if (NULL != label) {stringstream ss_label;ss_label << duration_units << " secs";label->assign(ss_label.str());}break;case UNIT_MICROSECONDS:duration_units = duration_secs * 1000000+ now_time.tv_usec - startTime_.tv_usec;if (NULL != label) {stringstream ss_label;ss_label << duration_units << " us";label->assign(ss_label.str());}break;case UNIT_MILLISECONDS:default:duration_units = duration_secs * 1000+ (now_time.tv_usec - startTime_.tv_usec + 500) / 1000;if (NULL != label) {stringstream ss_label;ss_label << duration_units << " ms";label->assign(ss_label.str());}break;}return duration_units;}/*** Creates a ServiceMethod, used for tracking a single service method* invocation (via the ServiceTracker). The passed name of the* ServiceMethod is used to group statistics (e.g. counts and durations)* for similar invocations; the passed signature is used to uniquely* identify the particular invocation in the log.** note: A version of this constructor is provided that automatically* forms a signature the name and a passed numeric id. Silly, sure,* but commonly used, since it often saves the caller a line or two of* code.** @param ServiceTracker *tracker The service tracker that will track this* ServiceMethod.* @param const string &name The service method name (usually independent* of service method parameters).* @param const string &signature A signature uniquely identifying the method* invocation (usually name plus parameters).*/ServiceMethod::ServiceMethod(ServiceTracker *tracker,const string &name,const string &signature,bool featureLogOnly): tracker_(tracker), name_(name), signature_(signature),featureLogOnly_(featureLogOnly){// note: timer_ automatically starts at construction.// invoke tracker to start service// note: Might throw. If it throws, then this object's destructor// won't be called, which is according to plan: finishService() is// only supposed to be matched to startService() if startService()// returns without error.tracker_->startService(*this);}ServiceMethod::ServiceMethod(ServiceTracker *tracker,const string &name,uint64_t id,bool featureLogOnly): tracker_(tracker), name_(name), featureLogOnly_(featureLogOnly){// note: timer_ automatically starts at construction.stringstream ss_signature;ss_signature << name << " (" << id << ')';signature_ = ss_signature.str();// invoke tracker to start service// note: Might throw. If it throws, then this object's destructor// won't be called, which is according to plan: finishService() is// only supposed to be matched to startService() if startService()// returns without error.tracker_->startService(*this);}ServiceMethod::~ServiceMethod(){// invoke tracker to finish service// note: Not expecting an exception from this code, but// finishService() might conceivably throw an out-of-memory// exception.try {tracker_->finishService(*this);} catch (...) {// don't throw}}uint64_tServiceMethod::step(const std::string &stepName){return tracker_->stepService(*this, stepName);}